在 RxJs 中实现一个带有固定堆栈的排队系统 [英] implement a queueing system with a fixed stack in RxJs
问题描述
假设我想要一个队列,其中任何时候只有 3 个项目被异步处理,我该如何处理?
Say I want a queue where only 3 items are being processed asynchronously at any one time, how do I go about this?
这就是我的意思:如果我有一组项目要上传到后端,即上传一些人工制品到云存储,然后创建/更新文档以反映每个人工制品的 url 和我不想:
This is what I mean: If I have a collection of items to upload to the backend i.e. upload some artefacts to cloud storage and consequently create/update a doc to reflect the url of each artefact and I don't want to:
- 在下一次之前异步/等待每个上传操作 - 因为这会很慢
- 同时发送所有内容 - 这可能会导致写入热点或速率限制
- 进行一次 promise.race - 这最终导致 (2)
- 做一个 promise.all - 如果有一个长时间运行的上传,这个过程会变慢.
我想做的是:
- 有一个所有上传的队列,比如使用 RxJs 创建方法,例如
from(array-of-upload-items)
,同时处理 3 个项目的堆栈. - 当一个项目离开堆栈时,即完成时,我们将一个新项目添加到队列中
- 确保在任何时候,堆栈中始终有 3 个项目正在处理,直到队列中没有更多项目等待放入堆栈为止.
- Have a queue of all the uploads, say with an RxJs create method e.g.
from(array-of-upload-items)
with a stack of 3 items being processed at any one time. - And when one item leaves the stack i.e. completes, we add a new item to the queue
- Ensure that at any one point, there are always 3 items in the stack being processed until there are no more items in the queue waiting to be put in the stack.
我将如何使用 RxJs 解决这个问题?
How would I go about this using RxJs?
2020 年 6 月 27 日
这是我的想法:
const rxQueue = from(filesArray) // this is the list of files to upload say like 25 files or so
rxQueue
.pipe(
mergeMap((item) =>
of(item).pipe(
tap(async (item) => {
await Promise.race([
processUpload(item[0]),
processUpload(item[1]),
processUpload(item[2]),
])
}),
),
3
),
)
.subscribe()
目标是确保在任何时候都处理(上传)了 3 个文件,以便在一个文件上传过程结束时,添加另一个文件以保持堆栈为 3 个上传过程.同理,如果 2 个文件上传同时结束,则将 2 个新文件添加到堆栈中,依此类推,直到文件数组中的所有文件都上传完毕.
The goal is to ensure that at any point, 3 files are being processed (uploaded) so much so that if one file upload process ends, another file is added to keep the stack at 3 upload processes. Same way, if 2 file uploads end at the same time, 2 new files are added to the stack and so on until all the files in the file array are uploaded.
推荐答案
我觉得你可以试试这个:
I think you could try this:
from(filesArray)
.pipe(
mergeMap(file => service.uploadFile(file), 3)
)
这假设 service.uploadFile
返回一个 promise 或一个 observable.
This assumes that service.uploadFile
returns a promise or an observable.
假设您有 5 个文件,那么将从前 3 个文件中创建 3 个 observable,当其中一个完成时,将采用第 4 个文件并从中创建一个新的 observable,依此类推.
Let's say you have 5 files, then 3 observables will be created from the first 3 and when one of them completes, the 4th file will be taken and a new observable will be created from it and so on.
这篇关于在 RxJs 中实现一个带有固定堆栈的排队系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!