一个流处理完毕后,如何保证异步code被执行? [英] How to ensure asynchronous code is executed after a stream is finished processing?

查看:88
本文介绍了一个流处理完毕后,如何保证异步code被执行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我通过侦听数据错误结束事件,以及我称之为一个函数来处理的第一个流中每个数据事件。当然,功能处理数据调用其他的回调,使得异步的。那么,如何开始执行更code当流中的数据进行处理?侦听结束事件流中并不意味着数据处理功能已经完成了异步的。

I have a stream that I process by listening for the data,error, and end events, and I call a function to process each data event in the first stream. Naturally, the function processing the data calls other callbacks, making it asynchronous. So how do I start executing more code when the data in the stream is processed? Listening for the end event in the stream does NOT mean the asynchronous data processing functions have finished.

我如何能确保数据流处理功能完成后,当我执行我的下一条语句?

How can I ensure that the stream data processing functions are finished when I execute my next statement?

下面是一个例子:

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
  var self = this;
  var promises = [];
  accountStream
    .on('data', function (account) {
      migrateAccount.bind(self)(account, finishMigration);
    })
    .on('error', function (err) {
      return console.log(err);
    })
    .on('end', function () {
      console.log("Finished updating account stream (but finishMigration is still running!!!)");
      callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running!
    });
}

var migrateAccount = function (oldAccount, callback) {
  executeSomeAction(oldAccount, function(err, newAccount) {
    if (err) return console.log("error received:", err);
    return callback(newAccount);
  });
}

var finishMigration = function (newAccount) {
  // some code that is executed asynchronously...
}

我如何确保后的数据流已被处理 callThisOnlyAfterAllAccountsAreMigrated 被称为?

可以这样用的承诺呢?它可以通过流来完成?我与工作的NodeJS,所以引用其他NPM模块可能会有所帮助。

Can this be done with promises? Can it be done with through streams? I am working with Nodejs, so referencing other npm modules could be helpful.

推荐答案

正如你所说,侦听结束流上的事件是对自己没用的。该流不知道或不关心你在你的数据数据处理程序做什么,所以你需要写一些code保持跟踪你的自己migrateAccount状态。

As you said, listening for the end event on the stream is useless on its own. The stream doesn't know or care what you're doing with the data in your data handler, so you would need to write some code to keep track of your own migrateAccount state.

如果是我的话,我会重写这整个一节。如果您使用事件与 .read()你的流,可以一次读取的许多项目你觉得就像对付。如果是这样的人,没有问题。如果是30,太棒了。这样做的原因是,这样你就不会溢出不管是做从流的数据来工作。由于-是现在,如果accountStream快,你的应用无疑会崩溃在某些时候。

If it were me, I would rewrite this whole section. If you use the readable event with .read() on your stream, you can read as many items at a time as you feel like dealing with. If that's one, no problem. If it's 30, great. The reason you do this is so that you won't overrun whatever is doing work with the data coming from the stream. As-is right now, if accountStream is fast, your application will undoubtedly crash at some point.

当你从一个流中读取一个项目,开始工作后,你得到的承诺回来(使用蓝鸟或类似),把它扔进一个数组。当承诺得到解决,从阵列中删除。当流结束,附加 .done()处理程序。所有()(基本上使一个诺大出每一个承诺仍然在数组中的)。

When you read an item from a stream and start work, take the promise you get back (use Bluebird or similar) and throw it into an array. When the promise is resolved, remove it from the array. When the stream ends, attach a .done() handler to .all() (basically making one big promise out of every promise still in the array).

您也可以使用一个简单的计数器对正在进行的工作。

You could also use a simple counter for jobs in progress.

这篇关于一个流处理完毕后,如何保证异步code被执行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆