RxJava 缓冲区直到改变 [英] RxJava buffer until changed
问题描述
我有一个 observable 可以发出大量数据,例如
I have an observable which emits large number of data like
[1,1,1,2,2,2,3,3,1,1,5,5......]
[1,1,1,2,2,2,3,3,1,1,5,5......]
在 RxJava 中,我们可以使用 distinctUntilChanged() 来获取不同的项目,直到它改变了,结果就像
In RxJava we can use distinctUntilChanged() to get a distinct items until it changed and it would result like
[1,2,3,1,5,......]
[1,2,3,1,5,......]
同样,有没有办法缓冲项目直到更改?例如我期望像
Similarly is there a way to buffer the items until changed? for example I expect an output like
[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]
[[1,1,1] , [2,2,2] , [3,3] , [1,1] , [5,5]......]
推荐答案
您可以共享源序列,将 distinctUntilChanged
应用到一个路径,然后将驱动 buffer
运算符使用 Observable
来指示边界:
You can share the source sequence, apply distinctUntilChanged
to one path which will then drive a buffer
operator that uses an Observable
to indicate boundaries:
@Test
@SuppressWarnings("unchecked")
public void test() {
Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
.compose(bufferUntilChanged(v -> v))
.test()
.assertResult(
Arrays.asList(1, 1, 1),
Arrays.asList(2, 2, 2),
Arrays.asList(3, 3),
Arrays.asList(1, 1),
Arrays.asList(5, 5)
);
}
static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
Function<T, K> keySelector) {
return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}
存在 skip(1)
是因为通过 distinctUntilChanged
的第一个项目会触发一个新的缓冲区,使第一个缓冲区为空.
The skip(1)
is there because the very first item passing through distinctUntilChanged
would trigger a new buffer, having the very first buffer empty.
这篇关于RxJava 缓冲区直到改变的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!