具有自定义计数标准的RxJava缓冲区/窗口 [英] RxJava buffer/window with custom counting criteria
问题描述
我有一个Observable,它发出许多对象,我想使用window
或buffer
操作对这些对象进行分组.但是,我希望能够使用自定义条件,而不是指定count
参数来确定窗口中应包含多少个对象.
I have an Observable which is emitting a number of objects and I want to group these objects using the window
or buffer
operations. However, instead of specifying a count
parameter for determining how many objects should be in window I want to be able to use a custom criteria.
例如,假定可观察对象正在发出Message
类的实例,如下所示.
For example suppose the observable is emitting instances of a Message
class like below.
class Message(
val int size: Int
)
我想基于其size
变量而不只是其计数来缓冲或窗口化消息实例.例如,要获取总大小最大为5000的消息窗口.
I would like to buffer or window the message instances based on their size
variable not just their counts. For example, to gain windows of messages with a total size of at most 5000.
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
有一种简单的方法吗?
推荐答案
首先,我不得不承认,我不是RxJava专家. 我只是发现您的问题具有挑战性,并试图找到解决方案.
First I have to confess, that I'm not a RxJava expert. I just found your question challenging and tried to find a solution.
有一个带有参数boundaryIndicator
的window()
函数.如果达到窗口大小,则必须创建一个Publisher
/Flowable
来发出一个项目.
There is a window()
function with a parameter boundaryIndicator
. You have to create a Publisher
/ Flowable
that emits an item, if the window size is reached.
在该示例中,我创建了一个用作boundaryIndicator
的对象windowManager
.在onNext
回调中,我调用windowManager
并为其提供打开新窗口的机会.
In the example I created an object windowManager
that is used as the boundaryIndicator
. In the onNext
callback I invoke the windowManager
and give it a chance to open a new window.
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}
这篇关于具有自定义计数标准的RxJava缓冲区/窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!