Rx - 按条件将流划分为段(列表) [英] Rx - Divide stream into segments (lists) by condition
问题描述
我有一个RX生产商,它创建一个像这样的字符串流(实际流的简化版):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6 ....
流是无穷无尽的,但是有序。因此,在以 A
开头的字符串用完之后, B
启动。当 B
用完时, C
启动...当 Z
用完了,我们搬到了 AA1
等。有一个未知数量的 A
, B
等等,但每封信件通常为10-30个实例。
我正在寻找一种方法将此流划分为所有A的块: A1 A2 A3
,所有B: B1 B2
,所有C: C1 C2 C3 C4 C5 C6
等。每个块可以是一个可观察的(我将变成一个列表)或只是一个列表。
<我使用RxJava尝试了几种不同的方法,所有方法都失败了。其中不起作用的是:
-
分组:因为流是无穷无尽的,每个字母的observable没有完成。所以当A的用完和B的开始时,A的Observable没有完成。所以可观察量越来越多。
-
带有distinctUntilChanged的窗口/缓冲区 - 我在原始流上使用distinctUntilChanged输出每组的第一项(第一个A,第一个B等)。然后我使用该流作为
window
的输入或缓冲运算符,用作窗口/缓冲区之间的边界。这不起作用,我得到的只是空列表。
什么是使用RX的正确解决方案?
我更喜欢Java解决方案,但是其他RX实现中可以轻松转换为Java的解决方案也非常受欢迎。
这是我解决这个问题的方法:
Observable< String> source = Observable.from(
A1,A2,A3,B1,B2,B3,C1,D1);
Observable< List< String>> output = Observable.defer(() - > {
List< String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap (new Function< String,Observable< List< String>>>(){
String lastKey;
@Override
public Observable< List< String>> apply(String t) {
String key = t.substring(0,1);
if(lastKey!= null&&!key.equals(lastKey)){
List< String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v - > {
if(buffer.isEmpty()){
return Observable.empty();
}
返回Observable.just(缓冲区);
})
);
}
);
output.subscribe(System.out :: println);
这是它的工作原理:
- 我正在使用延迟,因为我们需要一个每用户缓冲区,而不是一个全局缓冲区
- 如果来源,我将缓冲与最后一个缓冲区的发射连接起来恰好是有限的
- 我使用concatMap并添加到缓冲区,直到键发生变化,直到那时,我发出空的Observables。一旦密钥改变,我发出缓冲区的内容并开始一个新的。
I have an RX producer that creates a stream of strings like so (simplified version of the real stream):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
The stream is endless, but ordered. So after the strings that start with A
run out, B
starts. WhenB
runs out, C
starts... whenZ
run out, we move to AA1
etc. There's an unknown number of A
's, B
's etc, but it's typically 10-30 instances per letter.
I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3
, all B's: B1 B2
, all C's: C1 C2 C3 C4 C5 C6
etc. Each block can be either an observable (which I'll turn into a list) or simply a list.
I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:
Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.
Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to
window
or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.
What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.
Here is a way I'd solve this:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
This is how it works:
- I'm using defer because we need a per-subscriber buffer, not a global one
- I concatenate the buffering with the emission of the last buffer if the source happens to be finite
- I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.
这篇关于Rx - 按条件将流划分为段(列表)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!