NodeJS、promises、streams - 处理大型 CSV 文件 [英] NodeJS, promises, streams - processing large CSV files

查看:19
本文介绍了NodeJS、promises、streams - 处理大型 CSV 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件.鉴于文件的潜在大小,我想使用流式传输.

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理流中的块),并在文件被读取到结束(解决)或错误(被拒绝)时返回一个承诺.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

所以,我从:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相互关联的问题:

Now, I have two inter-related issues:

  1. 我需要限制正在处理的实际数据量,以免造成内存压力.
  2. 作为 processor 参数传递的函数通常是异步的,例如通过基于 Promise 的库将文件内容保存到数据库(现在:pg-promise).因此,它将在内存中创建一个承诺并重复前进.
  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

pg-promise 库具有管理此功能的功能,例如 page(),但我无法围绕如何将流事件处理程序与这些承诺方法混合在一起.现在,我在每个 read() 之后在 readable 部分的处理程序中返回一个 promise,这意味着我创建了大量承诺的数据库操作并最终出错,因为我达到进程内存限制.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

有没有人有这方面的工作示例,我可以将其用作跳跃点?

Does anyone have a working example of this that I can use as a jumping point?

更新:可能不止一种给猫剥皮的方法,但这是有效的:

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人认为这种方法有潜在问题吗?

Anyone sees a potential problem with this approach?

推荐答案

在下面找到一个完整的应用程序,它可以正确执行您想要的相同类型的任务:它将文件作为流读取,将其解析为 CSV 并插入每个排入数据库.

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我唯一改变的是:使用库 csv-parse 而不是 csv,作为更好的选择.

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

添加了 stream.read 方法的使用来自 <一个 href="https://github.com/vitaly-t/spex" rel="nofollow noreferrer">spex 库,它正确地服务于 可读 流,用于 promise.

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

这篇关于NodeJS、promises、streams - 处理大型 CSV 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆