如何一次处理 RxJS 流 n 个项目,一旦项目完成,又自动填充回 n 个项目? [英] How to process RxJS stream n items at a time and once an item is done, autofill back to n again?

查看:33
本文介绍了如何一次处理 RxJS 流 n 个项目,一旦项目完成,又自动填充回 n 个项目?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件.

I have a stream of events and I would like to call a function that returns a promise for each of those events, the problem is that this function is very expensive, so I would like to process at most n events at a time.

这个卵石图可能有误,但这是我想要的:

This pebble diagram is probably wrong but this is what I would like:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

这是我到目前为止的代码:

This is the code that I have so far:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

这段代码有两个问题:

  1. 它使用的是 inProgressCount 变量,它用 side 更新效果功能.
  2. done$ 订阅仅在我从受控流中请求超过 1 个项目时调用一次.这使得 inProgressCount 变量更新不正确,这最终将队列限制为一次一个.
  1. It's using the inProgressCount var which is updated with side effect functions.
  2. The done$ subscription is only called once when I request more than 1 item from the controlled stream. This is making the inProgressCount var to update incorrectly, this eventually limits the queue to one at a time.

你可以在这里看到它的工作:http://jsbin.com/wivehonifi/1/edit?js,console,output

You can see it working in here: http://jsbin.com/wivehonifi/1/edit?js,console,output

问题:

  1. 有更好的方法吗?
  2. 如何去除 inProgressCount 变量?
  3. 为什么在请求多个项目时 done$ 订阅只被调用一次?

更新:
问题 3 的答案:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个.将代码更新为 flatMap 而不是 switchMap.

Update:
Answer to question #3: switchMap is the same as flatMapLatest, so that's why I was only getting the last one. Updated the code to flatMap instead of switchMap.

推荐答案

您实际上根本不需要使用背压.有一个名为 flatMapWithMaxConcurrent 的运算符可以为您执行此操作.它本质上是调用 .map().merge(concurrency) 的别名,并且它一次只允许最大数量的流传输.

You actually don't need to use backpressure at all. There is an operator called flatMapWithMaxConcurrent that does this for you. It is essentially an alias for calling .map().merge(concurrency) and it only allows a maximum number of streams to be in flight at a time.

我在这里更新了你的 jsbin:http://jsbin.com/weheyuceke/1/编辑?js,输出

I updated your jsbin here: http://jsbin.com/weheyuceke/1/edit?js,output

但我在下面注释了重要的一点:

But I annotated the important bit below:

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>   
      //This overload of `fromPromise` defers the execution of the lambda
      //until subscription                    
      Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 
        updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the task
        return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));

这篇关于如何一次处理 RxJS 流 n 个项目,一旦项目完成,又自动填充回 n 个项目?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆