如何从单独的流(一些从原始流中创建)获得统一的 onFinish [英] How to get a unified onFinish from separate streams (some created from within original stream)

查看:32
本文介绍了如何从单独的流(一些从原始流中创建)获得统一的 onFinish的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个这样的流过程:

I have a stream process like this:

Incomming file via HTTP (original stream)
  -> Check if zipfile
    - Yes -> push through an unzip2-stream
    - No -> push to S3

当 unzip2-stream 找到 zip-entries 时,它们将通过相同的流链推送,即

When the unzip2-stream finds zip-entries, these are pushed through the same chain of streams, i.e.

Incomming file entry from zip file ("child" stream)
  -> Check if zipfile
    - Yes -> push through an unzip2-stream
    - No -> push to S3

感谢 https://stackoverflow.com/users/3580261/eljefedelrodeodeljefe 之后我设法解决了主要问题这段对话:如何重定向根据第一个块中的数据流到其他流?

Thanks to https://stackoverflow.com/users/3580261/eljefedelrodeodeljefe I managed to solve the main problem after this conversation: How to redirect a stream to other stream depending on data in first chunk?

为每个 zip 条目创建新的子"流的问题在于,这些流与原始流没有连接,因此我无法为所有流获得统一的 onFinish.

The problem with creating new "child" streams for every zip entry is that these will have no connection to the original stream, so I cannot get a unified onFinish for all the streams.

在处理(解压缩并发送到 S3)每个文件之前,我不想向发件人发送 202.我怎样才能做到这一点?

I don't want to send a 202 of to the sender before I have processed (unzipped and sent to S3) every file. How can I accomplish this?

我在想我可能需要某种控制对象来等待所有子流的 onFinish 并强制进程停留在原始 onFinish 事件中,直到处理完所有文件.这会矫枉过正吗?有没有更简单的解决方案?

I'm thinking that I might need some kind of control object which awaits onFinish for all child streams and forces the process to dwell in the original onFinish event until all files are processed. Would this be overkill? Is there a simpler solution?

推荐答案

我最终为流创建了一个单独的计数器.可能有更好的解决方案,但这是有效的.

I ended up making a separate counter for the streams. There is probably a better solution, but this works.

我将计数器对象作为参数发送到第一次调用我的 saveFile() 函数.计数器会传递给解压缩流,因此它可以传递给每个文件条目的 saveFile.

I send the counter object as an argument to the first call to my saveFile() function. The counter is passed along to the unzip stream so it can be passed to saveFile for every file entry.

  • 就在流开始(即管道)之前,我调用了 streamCounter.streamStarted().
  • 在管道链的最后一个 onFinish 中,我调用了 streamCounter.streamFinished()
  • 如果流变坏,我会调用 streamCounter.streamFailed()

就在我在表单发布路由中发送 202 之前,我等待 streamCounter.streamPromise 解析.

Just before I send the 202 in the form post route I wait for streamCounter.streamPromise to resolve.

我对 setInterval 解决方案不是很自豪.发出某种事件可能会更好.

I'm not very proud of the setInterval solution. It'd probably be better with some kind of event emitting.

module.exports.streamCounter = function() {
  let streamCount = 0;
  let isStarted = false;
  let errors = [];

  this.streamStarted = function(options) {
    isStarted = true;

    streamCount += 1;
    log.debug(`Stream started for ${options.filename}. New streamCount: ${streamCount}`);
  };

  this.streamFinished = function(options) {
    streamCount -= 1;
    log.debug(`Finished stream for ${options.filename}. New streamCount: ${streamCount}`);
  };

  this.streamFailed = function(err) {
    streamCount -= 1;
    errors.push(err);
    log.debug(`Failed stream because (${err.message}). New streamCount: ${streamCount}`);
  };

  this.streamPromise = new Promise(function(resolve, reject) {
    let interval = setInterval(function() {
      if(isStarted && streamCount === 0) {
        clearInterval(interval);

        if(errors.length === 0) {
          log.debug('StreamCounter back on 0. Resolving streamPromise');
          resolve();
        } else {
          log.debug('StreamCounter back on 0. Errors encountered.. Rejecting streamPromise');
          reject(errors[errors.length-1]);
        }
      }
    }, 100);
  });
};

起初我用一个 promise 数组尝试了这个概念,并在发送状态 202 之前等待 Promise.all().但 Promise.all() 就我所知只适用于静态数组.我的streamCount"在流式传输过程中会发生变化,所以我需要一个更动态的Promise.all".

At first I tried this concept with a promise array and waited for Promise.all() before sending status 202. But Promise.all() only works with static arrays as far as I can tell. My "streamCount" is changing during the streaming so I needed a more dynamic "Promise.all".

这篇关于如何从单独的流(一些从原始流中创建)获得统一的 onFinish的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆