两个有序观测值的完全外部联接 [英] Full outer join of two ordered observables

查看:92
本文介绍了两个有序观测值的完全外部联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们有两个可观测值Observable<Integer> o1Observable<Integer> o2,每个可观测值都严格按照递增的顺序进行.

Suppose we have two observables Observable<Integer> o1 and Observable<Integer> o2 and each observable is producing strictly increasing sequence.

任务是对这两个可观察对象执行完全外部联接的等效操作.例如

The task is to perform equivalent of full outer join on these two observables. For example join of

    Observable.just(0, 2, 3, 6)
    Observable.just(1, 2, 3, 4, 5, 6)

应该产生

    [ [0, _], [_, 1], [2, 2], [3, 3], [_, 4], [_, 5], [6, 6] ]

联接应该有效,并且在非常大或无限的流上都可以很好地工作.

The join should be efficient and work well on very large or infinite streams.

在拉场景中,该解决方案很容易.是否有惯用的rx方法来实现这一目标?

The solution is easy in pull scenario. Is there an idiomatic rx way to achieve this?

推荐答案

对此没有单个运算符,但是可以由标准运算符和扩展运算符组成行为:

There is no single operator for this but it is possible to compose the behavior from standard and extension operators:

static abstract class Pair implements Comparable<Pair> { 
    int value;

    @Override
    public int compareTo(Pair o) {
        return Integer.compare(value, o.value);
    }
}

static final class Left extends Pair {
    Left(int value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "[" + value + ", _]";
    }
}

static final class Right extends Pair {
    Right(int value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "[_, " + value + "]";
    }
}

static final class Both extends Pair {
    Both(int value) {
        this.value = value;
    }

    @Override
    public int hashCode() {
        return value;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof Both) {
            return ((Both)obj).value == value;
        }
        return false;
    }

    @Override
    public String toString() {
        return "[" + value + ", " + value + "]";
    }
}

@SuppressWarnings("unchecked")
@Test
public void test() {
    Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
    Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);

    Flowable.defer(() -> {
        boolean[] skip = { false };
        return Flowables.<Pair>orderedMerge(
                a.<Pair>map(Left::new), b.<Pair>map(Right::new)
            )
            .distinctUntilChanged()
            .buffer(2, 1)
            .flatMapIterable(buf -> {
                if (skip[0]) {
                    skip[0] = false;
                    return Collections.emptyList();
                }
                if (buf.size() == 2) {
                    if (buf.get(0).value == buf.get(1).value) {
                        skip[0] = true;
                        return Collections.singletonList(new Both(buf.get(0).value));
                    }
                    return buf.subList(0, 1);
                }
                return buf;
            });
    })
    .subscribe(System.out::println);
}

Flowables.orderedMerge RxJava 2扩展库中.

这篇关于两个有序观测值的完全外部联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆