NodeJS,Promise,流-处理大型CSV文件 [英] NodeJS, promises, streams - processing large CSV files
问题描述
我需要构建一个用于处理大型CSV文件的函数,以便在bluebird.map()调用中使用.考虑到文件的潜在大小,我想使用流式传输.
I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.
此函数应接受一个流(一个CSV文件)和一个函数(该流中的块进行处理),并在读取文件到末尾(已解决)或错误(已拒绝)时返回promise.
This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
Now, I have two inter-related issues:
- 我需要限制实际处理的数据量,以免造成内存压力.
- 作为
processor
参数传递的函数通常是异步的,例如,通过基于Promise的库将文件的内容保存到db(现在为:pg-promise
).这样,它将在内存中创建一个承诺,并不断重复下去.
- I need to throttle the actual amount of data being processed, so as to not create memory pressures.
- The function passed as the
processor
param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now:pg-promise
). As such, it will create a promise in memory and move on, repeatedly.
pg-promise
库具有管理此功能的功能,例如 page(),但是我无法解决如何将流事件处理程序与这些promise方法混合使用的问题.现在,我在每个read()
之后在readable
部分的处理程序中返回一个promise,这意味着我创建了大量承诺的数据库操作,最终由于遇到进程内存限制而出现故障.
The pg-promise
library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable
section after each read()
, which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.
有人有一个可以用作跳点的有效示例吗?
Does anyone have a working example of this that I can use as a jumping point?
更新:可能有多种方法可以给猫蒙皮,但这可行:
UPDATE: Probably more than one way to skin the cat, but this works:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人看到这种方法有潜在的问题吗?
Anyone sees a potential problem with this approach?
推荐答案
在下面找到一个完整的应用程序,该应用程序可以正确执行所需的相同类型的任务:它将文件读取为流,将其解析为CSV并插入每个排入数据库.
Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.
const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});
const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');
const db = pgp(cn);
function receiver(_, data) {
function source(index) {
if (index < data.length) {
// here we insert just the first column value that contains a prime number;
return this.none('insert into primes values($1)', data[index][0]);
}
}
return this.sequence(source);
}
db.task(t => {
return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
.then(data => {
console.log('DATA:', data);
}
.catch(error => {
console.log('ERROR:', error);
});
请注意,我唯一更改的是:使用库csv-parse
而不是csv
作为更好的选择.
Note that the only thing I changed: using library csv-parse
instead of csv
, as a better alternative.
添加了<方法中的方法 stream.read .一个href ="https://github.com/vitaly-t/spex" rel ="nofollow noreferrer"> spex 库,该库可以正确地提供
Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.
这篇关于NodeJS,Promise,流-处理大型CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!