RxJava 缓冲区直到改变 [英] RxJava buffer until changed

查看:36
本文介绍了RxJava 缓冲区直到改变的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 observable 可以发出大量数据,例如

I have an observable which emits large number of data like

[1,1,1,2,2,2,3,3,1,1,5,5......]

[1,1,1,2,2,2,3,3,1,1,5,5......]

在 RxJava 中,我们可以使用 distinctUntilChanged() 来获取不同的项目,直到它改变了,结果就像

In RxJava we can use distinctUntilChanged() to get a distinct items until it changed and it would result like

[1,2,3,1,5,......]

[1,2,3,1,5,......]

同样,有没有办法缓冲项目直到更改?例如我期望像

Similarly is there a way to buffer the items until changed? for example I expect an output like

[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]

[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]

推荐答案

您可以共享源序列,将 distinctUntilChanged 应用到一个路径,然后将驱动 buffer 运算符使用 Observable 来指示边界:

You can share the source sequence, apply distinctUntilChanged to one path which will then drive a buffer operator that uses an Observable to indicate boundaries:

@Test
@SuppressWarnings("unchecked")
public void test() {
    Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
    .compose(bufferUntilChanged(v -> v))
    .test()
    .assertResult(
            Arrays.asList(1, 1, 1),
            Arrays.asList(2, 2, 2),
            Arrays.asList(3, 3),
            Arrays.asList(1, 1),
            Arrays.asList(5, 5)
        );
}

static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
        Function<T, K> keySelector) {
    return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}

存在 skip(1) 是因为通过 distinctUntilChanged 的第一个项目会触发一个新的缓冲区,使第一个缓冲区为空.

The skip(1) is there because the very first item passing through distinctUntilChanged would trigger a new buffer, having the very first buffer empty.

这篇关于RxJava 缓冲区直到改变的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆