具有自定义计数标准的RxJava缓冲区/窗口 [英] RxJava buffer/window with custom counting criteria

查看:106
本文介绍了具有自定义计数标准的RxJava缓冲区/窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Observable,它发出许多对象,我想使用windowbuffer操作对这些对象进行分组.但是,我希望能够使用自定义条件,而不是指定count参数来确定窗口中应包含多少个对象.

I have an Observable which is emitting a number of objects and I want to group these objects using the window or buffer operations. However, instead of specifying a count parameter for determining how many objects should be in window I want to be able to use a custom criteria.

例如,假定可观察对象正在发出Message类的实例,如下所示.

For example suppose the observable is emitting instances of a Message class like below.

class Message(
   val int size: Int
)

我想基于其size变量而不只是其计数来缓冲或窗口化消息实例.例如,要获取总大小最大为5000的消息窗口.

I would like to buffer or window the message instances based on their size variable not just their counts. For example, to gain windows of messages with a total size of at most 5000.

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

有一种简单的方法吗?

推荐答案

首先,我不得不承认,我不是RxJava专家. 我只是发现您的问题具有挑战性,并试图找到解决方案.

First I have to confess, that I'm not a RxJava expert. I just found your question challenging and tried to find a solution.

有一个带有参数boundaryIndicatorwindow()函数.如果达到窗口大小,则必须创建一个Publisher/Flowable来发出一个项目.

There is a window() function with a parameter boundaryIndicator. You have to create a Publisher/ Flowable that emits an item, if the window size is reached.

在该示例中,我创建了一个用作boundaryIndicator的对象windowManager.在onNext回调中,我调用windowManager并为其提供打开新窗口的机会.

In the example I created an object windowManager that is used as the boundaryIndicator. In the onNext callback I invoke the windowManager and give it a chance to open a new window.

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}

这篇关于具有自定义计数标准的RxJava缓冲区/窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆