在 RxJs 中实现一个带有固定堆栈的排队系统 [英] implement a queueing system with a fixed stack in RxJs

查看:44
本文介绍了在 RxJs 中实现一个带有固定堆栈的排队系统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我想要一个队列,其中任何时候只有 3 个项目被异步处理,我该如何处理?

Say I want a queue where only 3 items are being processed asynchronously at any one time, how do I go about this?

这就是我的意思:如果我有一组项目要上传到后端,即上传一些人工制品到云存储,然后创建/更新文档以反映每个人工制品的 url 和我不想:

This is what I mean: If I have a collection of items to upload to the backend i.e. upload some artefacts to cloud storage and consequently create/update a doc to reflect the url of each artefact and I don't want to:

  1. 在下一次之前异步/等待每个上传操作 - 因为这会很慢
  2. 同时发送所有内容 - 这可能会导致写入热点或速率限制
  3. 进行一次 promise.race - 这最终导致 (2)
  4. 做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢.

我想做的是:

  1. 有一个所有上传的队列,比如使用 RxJs 创建方法,例如from(array-of-upload-items),同时处理 3 个项目的堆栈.
  2. 当一个项目离开堆栈时,即完成时,我们将一个新项目添加到队列中
  3. 确保在任何时候,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈为止.
  1. Have a queue of all the uploads, say with an RxJs create method e.g. from(array-of-upload-items) with a stack of 3 items being processed at any one time.
  2. And when one item leaves the stack i.e. completes, we add a new item to the queue
  3. Ensure that at any one point, there are always 3 items in the stack being processed until there are no more items in the queue waiting to be put in the stack.

我将如何使用 RxJs 解决这个问题?

How would I go about this using RxJs?

2020 年 6 月 27 日

这是我的想法:

 const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so

      rxQueue
        .pipe(
          mergeMap((item) =>
            of(item).pipe(
              tap(async (item) => {
                  await Promise.race([
                      processUpload(item[0]),
                      processUpload(item[1]),
                      processUpload(item[2]),
                  ])
              }),
            ),
            3
          ),
        )
        .subscribe()

目标是确保在任何时候都处理(上传)了 3 个文件,以便在一个文件上传过程结束时,添加另一个文件以保持堆栈为 3 个上传过程.同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,依此类推,直到文件数组中的所有文件都上传完毕.

The goal is to ensure that at any point, 3 files are being processed (uploaded) so much so that if one file upload process ends, another file is added to keep the stack at 3 upload processes. Same way, if 2 file uploads end at the same time, 2 new files are added to the stack and so on until all the files in the file array are uploaded.

推荐答案

我觉得你可以试试这个:

I think you could try this:

from(filesArray)
  .pipe(
    mergeMap(file => service.uploadFile(file), 3)
  )

这假设 service.uploadFile 返回一个 promise 或一个 observable.

This assumes that service.uploadFile returns a promise or an observable.

假设您有 5 个文件,那么将从前 3 个文件中创建 3 个 observable,当其中一个完成时,将采用第 4 个文件并从中创建一个新的 observable,依此类推.

Let's say you have 5 files, then 3 observables will be created from the first 3 and when one of them completes, the 4th file will be taken and a new observable will be created from it and so on.

这篇关于在 RxJs 中实现一个带有固定堆栈的排队系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆