NodeJS,Promise,流-​​处理大型CSV文件 [英] NodeJS, promises, streams - processing large CSV files

查看:188
本文介绍了NodeJS,Promise,流-​​处理大型CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要构建一个用于处理大型CSV文件的函数,以便在bluebird.map()调用中使用.考虑到文件的潜在大小,我想使用流式传输.

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

此函数应接受一个流(一个CSV文件)和一个函数(该流中的块进行处理),并在读取文件到末尾(已解决)或错误(已拒绝)时返回promise.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

所以,我开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相互关联的问题:

Now, I have two inter-related issues:

  1. 我需要限制实际处理的数据量,以免造成内存压力.
  2. 作为processor参数传递的函数通常是异步的,例如,通过基于Promise的库将文件的内容保存到db(现在为:pg-promise).这样,它将在内存中创建一个承诺,并不断重复下去.
  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

pg-promise库具有管理此功能的功能,例如 page(),但是我无法解决如何将流事件处理程序与这些promise方法混合使用的问题.现在,我在每个read()之后在readable部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作,最终由于遇到进程内存限制而出现故障.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

有人有一个可以用作跳点的有效示例吗?

Does anyone have a working example of this that I can use as a jumping point?

更新:可能有多种方法可以给猫蒙皮,但这可行:

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人看到这种方法有潜在的问题吗?

Anyone sees a potential problem with this approach?

推荐答案

在下面找到一个完整的应用程序,该应用程序可以正确执行所需的相同类型的任务:它将文件读取为流,将其解析为CSV并插入每个排入数据库.

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我唯一更改的是:使用库csv-parse而不是csv作为更好的选择.

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

添加了<方法中的方法 stream.read .一个href ="https://github.com/vitaly-t/spex" rel ="nofollow noreferrer"> spex 库,该库可以正确地提供

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

这篇关于NodeJS,Promise,流-​​处理大型CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆