在 RxJS 管道中处理未知数量的可观察对象 [英] Process unknown number of observables in RxJS pipe
问题描述
我对我的服务器进行了数千次调用,但为了避免过载,我对 RxJS 设置了 10 个并发限制:
I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS:
const calls = new Subject();
calls.pipe(
mergeAll(10)
).subscribe();
while(int < unknown_number){
calls.next(new Observable((observer) => {
// Call the server here
}))
}
问题是我不知道会打多少电话,我需要知道工作何时完成.一种方法是在 5 秒左右的队列中没有任何东西时获取.
The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 seconds or so.
我该怎么做?
推荐答案
阅读您的评论后,我认为答案在于针对 Subject
发出 complete
命令作为一旦我们知道没有更多的数据要从数据库中读取.
After reading your comment, I think the answer lays in issuing a complete
command against the Subject
as soon as we know there is no more data to be read from the DB.
所以,是一种伪代码,这可能是解决方案的草案
So, is a sort of pseudo-code, this could be the draft for the solution
// callToServer is a function that calls the server and returns an Observable
const callToServer: Observable<any> = (input) => httpClient.post(url, input);
const calls = new Subject<any>();
calls.pipe(
// margeMap allows to set concurrency level as second parameter
mergeMap(input => callToServer(input), 10)
).subscribe(
{
next: response => {
// do something with the response
},
error: err => {
// manage error occurrences
},
complete: () => {
// here you come when the Subject completes
}
}
);
const cursor = db.readFromMyTable();
while(cursor has still data){
const data = cursor.getNext(100);
// for each record issue a next against the calls Subject
data.forEach(d => calls.next(d));
}
// when there are no more records completes the calls Subject
这篇关于在 RxJS 管道中处理未知数量的可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!