两个有序观测值的完全外部联接 [英] Full outer join of two ordered observables
问题描述
假设我们有两个可观测值Observable<Integer> o1
和Observable<Integer> o2
,每个可观测值都严格按照递增的顺序进行.
Suppose we have two observables Observable<Integer> o1
and Observable<Integer> o2
and each observable is producing strictly increasing sequence.
任务是对这两个可观察对象执行完全外部联接的等效操作.例如
The task is to perform equivalent of full outer join on these two observables. For example join of
Observable.just(0, 2, 3, 6)
Observable.just(1, 2, 3, 4, 5, 6)
应该产生
[ [0, _], [_, 1], [2, 2], [3, 3], [_, 4], [_, 5], [6, 6] ]
联接应该有效,并且在非常大或无限的流上都可以很好地工作.
The join should be efficient and work well on very large or infinite streams.
在拉场景中,该解决方案很容易.是否有惯用的rx方法来实现这一目标?
The solution is easy in pull scenario. Is there an idiomatic rx way to achieve this?
推荐答案
对此没有单个运算符,但是可以由标准运算符和扩展运算符组成行为:
There is no single operator for this but it is possible to compose the behavior from standard and extension operators:
static abstract class Pair implements Comparable<Pair> {
int value;
@Override
public int compareTo(Pair o) {
return Integer.compare(value, o.value);
}
}
static final class Left extends Pair {
Left(int value) {
this.value = value;
}
@Override
public String toString() {
return "[" + value + ", _]";
}
}
static final class Right extends Pair {
Right(int value) {
this.value = value;
}
@Override
public String toString() {
return "[_, " + value + "]";
}
}
static final class Both extends Pair {
Both(int value) {
this.value = value;
}
@Override
public int hashCode() {
return value;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Both) {
return ((Both)obj).value == value;
}
return false;
}
@Override
public String toString() {
return "[" + value + ", " + value + "]";
}
}
@SuppressWarnings("unchecked")
@Test
public void test() {
Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable.defer(() -> {
boolean[] skip = { false };
return Flowables.<Pair>orderedMerge(
a.<Pair>map(Left::new), b.<Pair>map(Right::new)
)
.distinctUntilChanged()
.buffer(2, 1)
.flatMapIterable(buf -> {
if (skip[0]) {
skip[0] = false;
return Collections.emptyList();
}
if (buf.size() == 2) {
if (buf.get(0).value == buf.get(1).value) {
skip[0] = true;
return Collections.singletonList(new Both(buf.get(0).value));
}
return buf.subList(0, 1);
}
return buf;
});
})
.subscribe(System.out::println);
}
Flowables.orderedMerge
在 RxJava 2扩展库中.
这篇关于两个有序观测值的完全外部联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!