如何将Node.js异步流回调转换为异步生成器? [英] How to convert Node.js async streaming callback into an async generator?

查看:73
本文介绍了如何将Node.js异步流回调转换为异步生成器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个函数,可通过回调批量传输数据.

I have a function that streams data in batches via a callback.

每个批次将在提取另一个批次之前等待回调函数,并且整个函数将返回一个承诺,该承诺将在所有批次完成后解决.

Each batch will await the callback function before fetching another batch and the entire function returns a promise that resolves when all batches are finished.

(我正在使用TypeScript注释来提高可读性)

(I'm using TypeScript annotations to help with readability)

async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}

如何将此功能转换为一次生成一个值的异步生成器?

async function* generatorStream(): AsyncIterableIterator<number> {}

事实证明,这是一项艰巨的任务.

This has proven to be quite a difficult task.

我已经解决了这个问题,并且我已经构建了一些可行的方法,但是它非常复杂,我无法证明合并此代码并让团队中的其他人来处理它是合理的.

I've toyed around with this problem and I've built something that works, but its very convoluted and I can't justify merging this code and making others on my team deal with it.

这是我当前的实现方式:

Here's my current implementation:

我正在使用这个帮助器函数,该函数创建了一个延迟的" promise,有助于在回调周围传递promise.

I'm using this helper function that created a "deferred" promise which helps with passing promises around callbacks.

interface DeferredPromise<T> {
    resolve: (value: T) => void
    reject: (error: any) => void
    promise: Promise<T>
}

function deferred<T>(): DeferredPromise<T> {
    let resolve
    let reject
    const promise = new Promise<T>((res, rej) => {
        resolve = res
        reject = rej
    })
    return {
        resolve: resolve as (value: T) => void,
        reject: reject as (error: any) => void,
        promise,
    }
}

接下来,我有一个逻辑上的逻辑,可以线性化promise回调到一个链中,其中每个promise用next函数解析一个批次,该函数将返回另一个promise获取下一批.

Next I have this hairball of logic that linearizes the promise callbacks into a chain where each promise resolved a batch with next function that will return another promise fetching the next batch.

type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done

async function chainedPromises() {
    let deferred = PromiseUtils.deferred<Result>()

    callbackStream(async batch => {
        const next = PromiseUtils.deferred<null>()
        deferred.resolve({
            done: false,
            value: batch,
            next: () => {
                deferred = PromiseUtils.deferred<Result>()
                next.resolve(null)
                return deferred.promise
            },
        })
        await next.promise
    }).then(() => {
        deferred.resolve({ done: true })
    })

    return deferred.promise
}

从这里开始,创建一次只产生一项的生成器并不是很困难:

From here, creating a generator that yields one item at a time isn't very difficult:

async function* generatorStream(): AsyncIterableIterator<number> {
    let next = chainedPromises
    while (true) {
        const result = await next()
        if (result.done) {
            return
        }
        for (const item of result.value) {
            yield item
        }
        next = result.next
    }
}


我想我们都可以同意中间的chainedPromises函数非常令人困惑和困惑. 是否可以通过一种易于理解且易于遵循的方式将callbackStream转换为generatorStream?我不介意使用一个建立良好的库,但是我会也赞赏从第一原理的简单实现.


I think we can all agree that the intermediate chainedPromises function is very confusing and convoluted. Is there any way I can transform callbackStream into generatorStream in a way that is easy to understand and easy to follow? I don't mind using a library if its well established, but I would also appreciate a simple implementation from first-principles.

推荐答案

您需要一个事件存储桶,下面是一个示例:

You need a event bucket, here is an example:

function bucket() {
  const stack = [],
    iterate = bucket();

  var next;

  async function* bucket() {
    while (true) {
      yield new Promise((res) => {
        if (stack.length > 0) {
          return res(stack.shift());
        }

        next = res;
      });
    }
  }

  iterate.push = (itm) => {
    if (next) {
      next(itm);
      next = false;
      return;
    }

    stack.push(itm);
  }

  return iterate;
}

;
(async function() {
  let evts = new bucket();

  setInterval(() => {
    evts.push(Date.now());
    evts.push(Date.now() + '++');
  }, 1000);

  for await (let evt of evts) {
    console.log(evt);
  }
})();

这篇关于如何将Node.js异步流回调转换为异步生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆