Rx - 按条件将流划分为段(列表) [英] Rx - Divide stream into segments (lists) by condition

查看:192
本文介绍了Rx - 按条件将流划分为段(列表)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个RX生产商,它创建一个像这样的字符串流(实际流的简化版):



A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6 ....



流是无穷无尽的,但是有序。因此,在以 A 开头的字符串用完之后, B 启动。当 B 用完时, C 启动...当 Z 用完了,我们搬到了 AA1 等。有一个未知数量的 A B 等等,但每封信件通常为10-30个实例。



我正在寻找一种方法将此流划分为所有A的块: A1 A2 A3 ,所有B: B1 B2 ,所有C: C1 C2 C3 C4 C5 C6 等。每个块可以是一个可观察的(我将变成一个列表)或只是一个列表。



<我使用RxJava尝试了几种不同的方法,所有方法都失败了。其中不起作用的是:




  • 分组:因为流是无穷无尽的,每个字母的observable没有完成。所以当A的用完和B的开始时,A的Observable没有完成。所以可观察量越来越多。


  • 带有distinctUntilChanged的窗口/缓冲区 - 我在原始流上使用distinctUntilChanged输出每组的第一项(第一个A,第一个B等)。然后我使用该流作为 window 的输入或缓冲运算符,用作窗口/缓冲区之间的边界。这不起作用,我得到的只是空列表。




什么是使用RX的正确解决方案?
我更喜欢Java解决方案,但是其他RX实现中可以轻松转换为Java的解决方案也非常受欢迎。

解决方案

这是我解决这个问题的方法:

  Observable< String> source = Observable.from(
A1,A2,A3,B1,B2,B3,C1,D1);

Observable< List< String>> output = Observable.defer(() - > {
List< String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap (new Function< String,Observable< List< String>>>(){
String lastKey;
@Override
public Observable< List< String>> apply(String t) {
String key = t.substring(0,1);
if(lastKey!= null&&!key.equals(lastKey)){
List< String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v - > {
if(buffer.isEmpty()){
return Observable.empty();
}
返回Observable.just(缓冲区);
})
);
}
);

output.subscribe(System.out :: println);

这是它的工作原理:




  • 我正在使用延迟,因为我们需要一个每用户缓冲区,而不是一个全局缓冲区

  • 如果来源,我将缓冲与最后一个缓冲区的发射连接起来恰好是有限的

  • 我使用concatMap并添加到缓冲区,直到键发生变化,直到那时,我发出空的Observables。一旦密钥改变,我发出缓冲区的内容并开始一个新的。


I have an RX producer that creates a stream of strings like so (simplified version of the real stream):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

The stream is endless, but ordered. So after the strings that start with A run out, B starts. WhenB runs out, C starts... whenZ run out, we move to AA1 etc. There's an unknown number of A's, B's etc, but it's typically 10-30 instances per letter.

I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3, all B's: B1 B2, all C's: C1 C2 C3 C4 C5 C6 etc. Each block can be either an observable (which I'll turn into a list) or simply a list.

I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:

  • Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.

  • Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.

What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.

解决方案

Here is a way I'd solve this:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

This is how it works:

  • I'm using defer because we need a per-subscriber buffer, not a global one
  • I concatenate the buffering with the emission of the last buffer if the source happens to be finite
  • I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.

这篇关于Rx - 按条件将流划分为段(列表)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆