为什么我的 RxJava Observable 只向第一个消费者发出信号? [英] Why does my RxJava Observable emit only to the first consumer?

查看:44
本文介绍了为什么我的 RxJava Observable 只向第一个消费者发出信号?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

谁能解释为什么下面的测试失败?

Can someone explain why the below test fails?

public class ObservableTest {
    @Test
    public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() {
        // Any simpler observable makes the test pass
        Observable<Integer> badObservable = Observable.just(1)
                .zipWith(Observable.just(2), (one, two) -> Observable.just(3))
                .flatMap(observable -> observable);

        ObservableCalculator calc1 = new ObservableCalculator(badObservable);
        ObservableCalculator calc2 = new ObservableCalculator(badObservable);

        // zipping causes the failure
        // Calling calculate().toBlocking().subscribe() on each calc passes
        // Observable.from(listOfCalcs).flatMap(calc -> calc.calculate()) passes
        Observable.zip(ImmutableList.of(calc1.calculate(), calc2.calculate()), results -> results)
                .toBlocking()
                .subscribe();

        assertThat(calc1.hasCalculated).isTrue();
        assertThat(calc2.hasCalculated).isTrue(); // this fails
    }

    private static class ObservableCalculator {
        private final Observable<?> observable;

        public boolean hasCalculated = false;

        public ObservableCalculator(Observable<?> observable) {
            this.observable = observable;
        }

        public Observable<Void> calculate() {
            return observable.concatMap(o -> {
                hasCalculated = true;
                // returning Observable.just(null) makes the test pass
                return Observable.empty();
            });
        }
    }
}

我试图进一步简化坏"可观察对象,但找不到任何可以删除的内容以使其更简单.

I've tried to simplify the "bad" observable further, but can't find anything I can remove to make it simpler.

不过,我目前的理解是,它是一个 Observable(无论它是如何构造的),应该发出一个值然后完成.然后我们基于那个 Observable 创建两个相似的对象实例,并在这些对象上调用一个使用 Observable 的方法,记下已经这样做了,然后返回 Observable.empty().

My current understanding, though, is that it's an Observable which (regardless of how it's constructed), should emit a single value and then complete. We then make two similar instances of an object based on that Observable, and call a method on those objects which consumes the Observable, makes a note of having done so, and then returns Observable.empty().

谁能解释为什么使用这个 observable 会导致测试失败(当使用更简单的 observable 导致测试通过时)?

Can anyone explain why using this observable causes the test the fail (when using a simpler observable causes the test to pass)?

也可以通过连续调用 calculate().toBlocking().subscribe() 而不是使用 zip,或者让 calculate 返回 Observable.just(null) 来使测试通过.这对我来说一些有意义(如果 calc1 为空,zip 不会订阅 calc2,因为在这种情况下 zip 永远不会产生任何东西),但不完全有意义(我不明白为什么 zip对于更简单版本的 badObservable 而言,它的行为不会像那样——无论输入如何,calculate() 方法仍然返回空.

It's also possible to make the test pass by either serially calling calculate().toBlocking().subscribe() rather than using zip, or making calculate return Observable.just(null) instead. That makes some sense to me (zip won't subscribe to calc2 if calc1 is empty, since it in that case zip could never yield anything), but not complete sense (I don't understand why zip doesn't behave like that for a simpler version of badObservable - the calculate() methods still return empty, regardless of that input).

推荐答案

如果你用一些东西压缩一个空的源,操作员会检测到它不能再产生任何值并取消订阅它的所有源.涉及 zip 和 merge 的混合,merge 非常重视取消订阅:它根本不发出值 3,因此 concatMap 也不会调用第二个源的映射函数.

If you zip an empty source with something, the operator detects it can't produce any value anymore and unsubscribes from all of its sources. There is a mix of zip and merge involved and merge takes unsubscription seriously: it doesn't emit the value 3 at all thus concatMap doesn't call the mapping function for the second source either.

这篇关于为什么我的 RxJava Observable 只向第一个消费者发出信号?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆