Rxjs:块和延迟流? [英] Rxjs: Chunk and delay stream?

查看:114
本文介绍了Rxjs:块和延迟流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

简而言之,尝试将一个非常大的数组块分成10块并等待5秒后再发出下一个10块。

In short, trying to chunk a really large array into chunks of 10 and wait 5 seconds before emitting the next 10.

这是我目前拥有的

Rx.Observable
   .from(hugeArray)
   .bufferCount(10) 
   .delay(5000) //want to wait 5 secs
   .flatMap(e => e) // this needs to go after to flatten the array, buffer spits out arrays of entries
   .flatMap( (data, index) => Rx.Observable.create(observer => {
       // going to render stuff here
       observer.onNext(data)
       observer.onCompleted();  

   }))
   .subscribe(val => console.log('Buffered Values:', val));

只是尝试在5秒内完成10个块,只能做一个初始延迟然后它排出其余部分。

Just trying to do 10 chunks ever 5 seconds, only been able to do an initial delay and then it emitted the rest.

推荐答案

您的链条一次性发出一切,然后安排每个块从同一时间开始等待5s,因此延迟已经过去了所有块都在同一时刻。

Your chain just emitted everything at once and then scheduled each chunk to wait 5s starting at the same time so the delay elapsed for all chunks at the exact same moment.

解决方案可能是使用 concatMap()订阅每个Observable一个一个。

Solution could be to use concatMap() that subscribes to each Observable one by one.

Rx.Observable
    .from(hugeArray)
    .bufferCount(10)
    .concatMap(data => Rx.Observable.of(data).delay(5000))
    .flatMap(e => e) // or mergeAll() or concatAll()
    .flatMap( (data, index) => Rx.Observable.create(observer => {
        // going to render stuff here
        observer.onNext(data);
        observer.onCompleted();
    }))
    .subscribe(val => console.log('Buffered Values:', val));

这篇关于Rxjs:块和延迟流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆