nodejs:从文件读取并存储到db,限制最大并发db操作 [英] nodejs: read from file and store to db, limit maximum concurrent db operations

查看:1108
本文介绍了nodejs:从文件读取并存储到db,限制最大并发db操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个CSV文件,作为流读入,并使用transforms转换为JSON,然后异步存储每个行到一个数据库。



问题是从文件读取速度很快,因此导致非常大量的并发异步数据库操作,这会导致应用程序停止。



d喜欢限制应用程序,以便在任何给定时间正在进行最多N个未完成的数据库操作。



这是我的_transform函数的基本核心: p>

  parser._transform = function(data,encoding,done){
// push data rows
var tick = this._parseRow(data);

//存储tick
db.set(tick.date,tick,function(err,result){
console.log(result);
if err)throw err;
});

this.push(tick);
done();
};

我看过几个选项,但这些似乎是最好的候选人:




  • 使用async api'forEachLimit'

    • 我在这里看到的问题是,

    • 读取整个文件是不可行的,因为大小


  • 使用如第7.2.3节所述的异步,并行限制解决方案:

    • http://book.mixu.net/node/ch7.html


    • 旋转或使用setTimeout似乎会耗尽所有预定时间,并阻止我的数据库回调,这应该减少正在启动正在运行的计数器。




并发有限解决方案:

  var limit = 100; 
var running = 0;

parser._transform = function(data,encoding,done){
// push data rows
var tick = this._parseRow(data);

this.push(tick);
//存储到db
if(running< limit){
console.log(limit not reached,schedule set);
running ++;
cb.set(tick.date,tick,function(err,result){
running--;
console.log(running is:+ running);
console.log(result);
if(err)throw err;
});
} else {
console.log(max limit reached,sleeping);
setTimeout(this._transform(data,encoding,done),1000);
}
done();
};

我这周只启动node.js,所以我不清楚什么是正确的模型

注意:我注意到的一些事情是,如果使用这个函数,这至少应该是一个指数退化。后者模型,并且应该有一些最大回退系统到位,以便不吹出调用​​堆栈。尝试保持简单在这里现在虽然。

解决方案

并发有限解决方案选项是我会采取的方法,实现我自己,我只是使用异步模块。具体来说,排队方法。



像:

  var dbQueue = async.queue(function(tick,callback){
db.set .date,tick,function(err,result){
console.log(result);
callback(err,result);
});
},3) //最后一个arg(3)是并发级别;根据需要调整

parser._transform = function(data,encoding,done){
// push data rows
var tick = this._parseRow(data);

dbQueue.push(tick);

this.push(tick);
done();
};

这样会将数据库操作限制为每次3个。此外,您可以使用队列的饱和事件暂停 / resume 您的流,以使资源使用方面更有限(如果你正在读大文件,这将是很好的)。它看起来像:

  dbQueue.saturated = function(){
parser.pause
}

dbQueue.empty = function(){
parser.resume();
}


I have a CSV file that I am reading in as a stream, and using transforms to convert to JSON and then asynchronously store each line to a DB.

The issue is that reading from the file is fast, and so leads to very large numbers of concurrent async DB operations, which causes the app to grind to a halt.

I'd like to limit the app such that a max of N outstanding DB operations are in progress at any given time.

This is the basic core of my _transform function:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

I've looked at a few options, but these seemed the best candidates:

  • Use the async api 'forEachLimit'
    • The problem I see here is that in my stream transform, I am only operating on one object (line from file) when issuing operations.
    • Reading whole file in isn't feasible due to size
  • Use an asynchronous, parallel, concurrency limited solution as described here, in section 7.2.3:
    • http://book.mixu.net/node/ch7.html
    • The problem for me here is what to do in the case 'limit' is reached.
    • Spinning or using setTimeout seems to use up all scheduled time and prevents my DB callbacks which should decrement the 'running' counter being initiated.

This was my initial attempts at the 'concurrency limited solution':

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

I've only started node.js this week, so I'm not clear what the correct model for solving this is.

Note: Couple of things I am aware of that is that this should at least be an exponential backoff if using the latter model, and there should be some 'max backoffs' system in place, so as not to blow out the call stack. Tried to keep it simple here for now though.

解决方案

The concurrency limited solution option is the approach I would take but, instead of implementing that myself, I'd just use the async module. Specifically, the queue method.

Something like:

var dbQueue = async.queue(function(tick, callback) {
    db.set(tick.date, tick, function(err, result) {
        console.log(result);
        callback(err, result);
    });
}, 3); // the last arg (3) is the concurrency level; tweak as needed

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    dbQueue.push(tick);

    this.push(tick);
    done();
};

That will limit your db operations to 3 at a time. Additionally, you can use the queue's saturated and empty events to pause/resume your stream to keep things even more limited in terms of resource use (which will be nice if you're reading really large files). That would look something like:

dbQueue.saturated = function() {
    parser.pause();
}

dbQueue.empty = function() {
    parser.resume();
}

这篇关于nodejs:从文件读取并存储到db,限制最大并发db操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆