NodeJS - 看到从读数据流事件,而不从写流对应的暂停 [英] NodeJS - Seeing data events from readable stream without corresponding pause from writable stream

查看:223
本文介绍了NodeJS - 看到从读数据流事件,而不从写流对应的暂停的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们看到了一些我们的生产流的一些非常高的内存使用情况。该文件存储在S3和我们打开S3对象的可读流,然后我们管这些数据对我们的本地文件系统的文件(在我们的EC2实例)。我们的一些客户有非常大的文件。在一种情况下,他们有这样结束了6GB大小的文件,并且正在处理该文件的节点的过程中使用,我们几乎用尽了所有的交换空间那么大的内存和机器放缓到爬行。很显然,有一些内存泄漏的地方是什么,我想追查。

We are seeing some extremely high memory usage with some of our streams in production. The files are stored in S3 and we open a readable stream on the S3 object, and then we pipe that data to a file on our local filesystem (on our EC2 instance). Some of our clients have extremely large files. In one instance they had a file that was over 6GB in size, and the node process that was handling this file was using so much memory that we exhausted almost all of our swap space and the machine slowed down to a crawl. Obviously, there is some memory leak somewhere which is what I'm trying to track down.

在此期间,我增加了codeA位,当我们看到流一定要记录的事件。我有以下的code和从日志输出部分样品有一个小的测试文件。什么困扰我的是,读取数据流接收暂停事件,然后继续发射数据和暂停事件的没有事实的可写流散发出漏事件。难道我完全失去了一些东西在这里?一旦可读流暂停,它如何继续接收排水之前发出数据的事件?在写流并没有标明是准备好了,所以可读流不应该送什么,对吧?

In the meantime, I augmented our code a bit to log when we see certain events from the streams. I have the code below and some sample output from the log with a small test file. What perplexes me is the fact that the readable stream receives a pause event and then continues to emit data and pause events WITHOUT the writable stream emitting a drain event. Am I completely missing something here? Once the readable stream is paused, how does it continue to emit data events prior to receiving a drain? The writable stream hasn't indicated that it is ready yet, so the readable stream should not be sending anything...right?

然而,看看输出。第3个活动是有意义的我:数据,暂停,沥干。那么接下来3的罚款:数据,数据暂停。但随后发出另一个数据之前最后得到一个排在第9事件又过了一会事件。我不明白为什么事件7和8起发生在漏不会发生,直到9日的事件。话又说回来9日活动结束后还有一堆数据/暂停对没有任何相应的排放。为什么?我会想到的是一些数量的数据事件,然后停顿,然后的 NOTHING 的,直到漏事件发生 - 在可能再出现点数据的事件。在我看来,一旦停顿发生,没有数据的事件应该发生在任何直到漏事件触发。也许我还是从根本上误解了一些关于节点流?

Yet look at the output. The first 3 events make sense to me: data, pause, drain. Then the next 3 are fine: data, data, pause. But THEN it emits another data and another pause event before finally getting a drain as the 9th event. I don't understand why events 7 and 8 occurred since the drain doesn't happen until the 9th event. Then again after the 9th event there are a bunch of data/pause pairs without any corresponding drain. Why? What I would expect is some number of data events, then a pause, and then NOTHING until a drain event occurs -- at which point data events could occur again. It seems to me that once a pause has occurred, no data events should occur at all until a drain event fires. Maybe I still fundamentally misunderstand something about Node streams?

更新:该文档没有提及任何关于暂停活动由读取数据流被发射任何东西,但他们提到,暂停功能可用。 presumably这个被调用时,写流返回假的,我会承担起暂停功能将发出暂停活动。在任何情况下,如果暂停()被调用时,该文档似乎JIVE我对世界的看法。请参见 https://nodejs.org/docs/v0.10.30/api/ stream.html#stream_class_stream_readable

UPDATE: The docs do not mention anything about a pause event being emitted by readable streams, but they do mention that a pause function is available. Presumably this gets called when the writable stream returns false, and I would assume the pause function would emit the pause event. In any case, if pause() is invoked, the docs seem to jive with my view of the world. See https://nodejs.org/docs/v0.10.30/api/stream.html#stream_class_stream_readable

此方法将导致流动模式流为停止发射数据   事件。变为可用的任何数据将保持在内部   缓冲区。

This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.

此测试运行在自己的计算机(Ubuntu的14.04与节点v0.10.37)。我们在督促EC2实例几乎是相同的。我想,他们跑v0.10.30现在。

This test was run on my development machine (Ubuntu 14.04 with Node v0.10.37). Our EC2 instances in prod are almost the same. I think they run v0.10.30 right now.

S3Service.prototype.getFile = function(bucket, key, fileName) {
  var deferred = Q.defer(),
    self = this,
    s3 = self.newS3(),
    fstream = fs.createWriteStream(fileName),
    shortname = _.last(fileName.split('/'));

  logger.debug('Get file from S3 at [%s] and write to [%s]', key, fileName);

  // create a readable stream that will retrieve the file from S3
  var request = s3.getObject({
    Bucket: bucket,
    Key: key
  }).createReadStream();

  // if network request errors out then we need to reject
  request.on('error', function(err) {
      logger.error(err, 'Error encountered on S3 network request');
      deferred.reject(err);
    })
    .on('data', function() {
      logger.info('data event from readable stream for [%s]', shortname);
    })
    .on('pause', function() {
      logger.info('pause event from readable stream for [%s]', shortname);
    });

  // resolve when our writable stream closes, or reject if we get some error
  fstream.on('close', function() {
      logger.info('close event from writable stream for [%s] -- done writing file', shortname);
      deferred.resolve();
    })
    .on('error', function(err) {
      logger.error(err, 'Error encountered writing stream to [%s]', fileName);
      deferred.reject(err);
    })
    .on('drain', function() {
      logger.info('drain event from writable stream for [%s]', shortname);
    });

  // pipe the S3 request stream into a writable file stream
  request.pipe(fstream);

  return deferred.promise;
};

[2015-05-13T17:21:00.427Z]信息:工人/ 7525上bdmlinux:从流读取数据事件[FeedItem.csv] [2015-05-13T17:21:00.427Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.427Z]信息:工人在bdmlinux / 7525:从写流排水事件[FeedItem.csv] [2015-05-13T17:21:00.507Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.514Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.515Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.515Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.515Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.515Z]信息:工人在bdmlinux / 7525:从写流排水事件[FeedItem.csv] [2015-05-13T17:21:00.595Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.596Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.596Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.596Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.597Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.597Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.597Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.597Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.598Z]信息:工人在bdmlinux / 7525:从写流排水事件[FeedItem.csv] [2015-05-13T17:21:00.601Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.602Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.602Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.602Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.603Z]信息:工人在bdmlinux / 7525:从写流排水事件[FeedItem.csv] [2015-05-13T17:21:00.627Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.627Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.627Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.628Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.628Z]信息:工人在bdmlinux / 7525:从写流排水事件[FeedItem.csv] [2015-05-13T17:21:00.688Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.689Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.689Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.689Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.690Z]信息:工人/ 7525上bdmlinux:从读数据流事件[FeedItem.csv] [2015-05-13T17:21:00.690Z]信息:工人在bdmlinux / 7525:从可读流暂停事件[FeedItem.csv] [2015-05-13T17:21:00.691Z]信息:工人在bdmlinux / 7525:关闭的事件,从写流为[FeedItem.csv] - 写完文件

[2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.427Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.507Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.514Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.515Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.595Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.596Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.597Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.598Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.601Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.602Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.603Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.627Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.628Z] INFO: worker/7525 on bdmlinux: drain event from writable stream for [FeedItem.csv] [2015-05-13T17:21:00.688Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.689Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: data event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.690Z] INFO: worker/7525 on bdmlinux: pause event from readable stream for [FeedItem.csv] [2015-05-13T17:21:00.691Z] INFO: worker/7525 on bdmlinux: close event from writable stream for [FeedItem.csv] -- done writing file

推荐答案

这是可能的,你有一些量子般的观察现象改变的结果这里的情况。节点介绍在v0.10流媒体的新方式。从文档

It's possible you have some quantum-like "observing the phenomenon changes the outcome" situation here. Node introduced a new way of streaming in v0.10. From the docs:

如果您将数据事件侦听器,那么它会切换流进流出状态时,数据会尽快,因为它是可传递到您的处理程序。

If you attach a data event listener, then it will switch the stream into flowing mode, and data will be passed to your handler as soon as it is available.

也就是说,附加数据监听器将恢复流,以经典的流方式。这可能就是为什么你得到的行为是与你的文档的其余部分读什么不一致。观察事物unintrusively,你可以尝试在('数据')删除你的并插入自己流中使用的是这样的:

Namely, attaching a data listener will revert the stream to the classic stream mode. This may be why you're getting behavior that's inconsistent with what you read in the rest of the docs. To observe things unintrusively, you could try removing your on('data') and insert your own stream in between using through like this:

var through = require('through');

var observer = through(function write(data) {
    console.log('Data!');
    this.queue(data);
}, function end() {
    this.queue(null);
});

request.pipe(observer).pipe(fstream);

这篇关于NodeJS - 看到从读数据流事件,而不从写流对应的暂停的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆