如何在RxJava中延迟发布Collection中的项目? [英] How to emit items from a Collection with delay in RxJava?

查看:118
本文介绍了如何在RxJava中延迟发布Collection中的项目?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  // //给给定的coll填充一些项目,
//返回发出填充集合的观察值
//做ASYNC工作
Observable< Collection> (fillColl){...}

def coll = []
generate )
))。subscribe({...})

问题在于这个集合可以包含数以千计的项目,并且自从 generate 工作异步,此代码使订阅方法几乎立即被调用几千次(这不是我想要的工作做在内部观察员)。



如何修改此代码以便延迟收集集合中的项目?
例如:发射100个物品,然后等待100ms,然后发射另外100个物品或等待10毫秒才能发射下一个物品?

解决方案

在flatMap中,需要将 filledColl 拆分成较小的部分,延迟每个部分,并将它们合并成一个observable,然后您将在flatMap中返回。

  generate(coll).flatMap({filledColl  - > 
def chunkSize = 100
resultStream = rx.Observable .first()
for(i in 0 ..< filledCol.size()/ chunkSize){
def chunk = filledCol [i * chunkSize ..(i + 1)* chunkSize]
resultStream = resultStream.mergeWith(
rx.Observable.from(chunk).delay(100 * i,TimeUnit.MILLISECONDS)

}
resultStream
})。subscribe({...})

这只是一个粗略的想法,你可能还想要根据您的需求进行测试,调整和修正。此外,将它移入生成函数可能更有意义,但这取决于您,因为我无法知道generate()中的内容。


I have the below code emitting items from a collection.

// fills the given coll with some items and
// returns the observable emitting the filled collection
// does ASYNC work
Observable<Collection> generate(Collection coll){ ... }

def coll = []
generate(coll).flatMap({ filledColl ->
    rx.Observable.from(filledColl)
}).subscribe({ ... })

The problem is that this collection can contain thousands of items and since generate works async, this code causes the subscribe method to be called thousands of times almost instantly (which is not wanted for the work I'm doing inside observer).

How can I modify this code to emit items from collection with a delay? For example: emit 100 items then wait 100ms then emit another 100 items or wait 10ms before emitting next item?

解决方案

Inside the flatMap, you need to split the filledColl into smaller parts, delay each part, and merge them all into one observable which you will return inside the flatMap.

generate(coll).flatMap({ filledColl ->
    def chunkSize = 100
    resultStream = rx.Observable.never()
    for (i in 0 ..< filledCol.size()/chunkSize) {
        def chunk = filledCol[i*chunkSize .. (i+1)*chunkSize]
        resultStream = resultStream.mergeWith(
            rx.Observable.from(chunk).delay(100*i, TimeUnit.MILLISECONDS)
        )
    }
    resultStream
}).subscribe({ ... })

That's just the rough idea, you might still want to test, tweak and correct according to your needs. Also, it might make more sense to move this into the generate function, but that's up to you since I cannot know what is in generate().

这篇关于如何在RxJava中延迟发布Collection中的项目?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆