在 RxJS 管道中处理未知数量的可观察对象 [英] Process unknown number of observables in RxJS pipe

查看:47
本文介绍了在 RxJS 管道中处理未知数量的可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对我的服务器进行了数千次调用,但为了避免过载,我对 RxJS 设置了 10 个并发限制:

I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS:

const calls = new Subject();
calls.pipe(
    mergeAll(10)
).subscribe();

while(int < unknown_number){
    calls.next(new Observable((observer) => {
        // Call the server here
    }))
}

问题是我不知道会打多少电话,我需要知道工作何时完成.一种方法是在 5 秒左右的队列中没有任何东西时获取.

The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 seconds or so.

我该怎么做?

推荐答案

阅读您的评论后,我认为答案在于针对 Subject 发出 complete 命令作为一旦我们知道没有更多的数据要从数据库中读取.

After reading your comment, I think the answer lays in issuing a complete command against the Subject as soon as we know there is no more data to be read from the DB.

所以,是一种伪代码,这可能是解决方案的草案

So, is a sort of pseudo-code, this could be the draft for the solution

// callToServer is a function that calls the server and returns an Observable
const callToServer: Observable<any> = (input) => httpClient.post(url, input);

const calls = new Subject<any>();
calls.pipe(
    // margeMap allows to set concurrency level as second parameter
    mergeMap(input => callToServer(input), 10)
).subscribe(
   {
      next: response => {
        // do something with the response
      },
      error: err => {
        // manage error occurrences
      },
      complete: () => {
        // here you come when the Subject completes
      }
   }
);

const cursor = db.readFromMyTable();

while(cursor has still data){
    const data = cursor.getNext(100);
    // for each record issue a next against the calls Subject
    data.forEach(d => calls.next(d));
}

// when there are no more records completes the calls Subject

这篇关于在 RxJS 管道中处理未知数量的可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆