RxJava,一个可观察的多个订阅者:publish().autoConnect() [英] RxJava, one observable multiple subscribers: publish().autoConnect()

查看:68
本文介绍了RxJava,一个可观察的多个订阅者:publish().autoConnect()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在玩 rxJava/rxAndroid,但有一些非常基本的东西与我预期的不一样.我有一个 observable 和两个订阅者:

I'm playing around with rxJava/rxAndroid and there's something very basic that doesn't behave as I'd expect. I have this one observable and two subscribers:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

现在,我知道我可以通过使用 publish().autoConnect() 来避免重复计数,但我试图首先了解这种默认行为.每次有人订阅 observable 时,它​​就会开始发出数字序列.我明白了.因此,当 Subscriber 1 连接时,它开始发出项目.Subscriber 2 立即连接,为什么它也没有得到值?

Now, I know I could avoid repeating the count by using publish().autoConnect() but I'm trying to understand this default behaviour first. Each time someone subscribe to the observable it starts emitting the number sequence. I get that. So, when when Subscriber 1 connects it starts emitting items. Subscriber 2 connects right away, why isn't it getting the values as well?

我是这么理解的,从可观察的角度来看:

This is how I understand it, From the perspective of the observable:

  1. 有人订阅了我,我应该开始发布项目
    [订阅者:1][要发出的物品:1,2,3]

向订阅者发送项目1"
[订阅者:1][要发出的物品:2,3]

其他人订阅了我,我完成后会再次发出 1,2,3
[订阅者:1 &2][要发射的物品:2,3,1,2,3]

Someone else subscribed to me, I'll emit 1,2,3 once again when I'm done
[SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 2,3,1,2,3]

向订阅者发送项目2"
[订阅者:1 &2][要发射的物品:3,1,2,3]

向订阅者发送项目3"
[订阅者:1 &2][要发射的物品:1,2,3]

向订阅者发送项目1"
[订阅者:1 &2][要发射的物品:2,3]

...

但这不是它的工作方式.就好像它们是两个独立的可观察对象合二为一.这让我很困惑,他们为什么不将这些项目提供给所有订阅者?

But this is not how it works. It's like they are two separate observables in one. this confuses me, why don't they give the items to all subscribers?

奖励:

publish().autoConnect() 如何解决这个问题?让我们分解一下.publish() 给了我一个可连接的 observable.一个可连接的 observable 就像一个普通的 observable 但你可以告诉它什么时候连接.然后我继续通过调用 autoConnect()

How is that publish().autoConnect() fixes the problem? Let's break it down. publish() gives me a connectable observable. a connectable observable is just like a regular observable but you can tell it when to connect. Then I go ahead tell it to connect right away by calling autoConnect()

这样做……我是不是得到了和开始时一样的东西?一个普通的常规可观察对象.运营商似乎相互抵消.

By doing so... don't I get the same thing I started with? A plain regular observable. The operators appear to cancel each other.

我可以闭嘴并使用publish().autoconnect().但我想更多地了解 observables 的工作原理.

I could just shut up and use publish().autoconnect(). But I'd like to understand more about how observables work.

谢谢!

推荐答案

这是因为实际上它们是两个独立的 observable.当您调用 subscribe() 时,它们会产生".因此,您提供的步骤在步骤 3 & 的意义上是不正确的.4 只是 1 &2 但在不同的 observable 上.

This is because in fact those are two separate observables. They are "spawned" when you invoke subscribe(). Therefore the steps you have provided are incorrect in sense that step 3 & 4 are just 1 & 2 but on a different observable.

但是您将它们视为 1 1 1 2 2 2,因为日志记录发生在线程上.如果您要删除 observeOn() 部分,那么您将看到以交织的方式排放.要在下面查看此运行代码:

But you see them as 1 1 1 2 2 2 because of thread on which the logging happens. If you were to remove the observeOn() part then you would see emissions in a interwoven manner. To see this run code below:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    Observable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation());
                    //.observeOn(single);

    dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));

    Thread.sleep(1000);
}

输出,至少在我的运行中是(注意线程名称):

Output, at least in my run was(notice thread names):

1  RxComputationThreadPool-1 135376988
2  RxComputationThreadPool-2 135376988
1  RxComputationThreadPool-1 135486815
2  RxComputationThreadPool-2 135537383
1  RxComputationThreadPool-1 135560691
2  RxComputationThreadPool-2 135617580

如果你应用 observeOn() 它变成:

and if you apply the observeOn() it becomes:

1  RxSingleScheduler-1 186656395
1  RxSingleScheduler-1 187919407
1  RxSingleScheduler-1 187923753
2  RxSingleScheduler-1 186656790
2  RxSingleScheduler-1 187860148
2  RxSingleScheduler-1 187864889

正如您正确指出的,要获得您想要的内容,您需要 publish().refcount() 或简单的 share()(它是别名)操作员.

As you have correctly pointed out, to get what you want you need the publish().refcount() or simply share()(it is an alias) operator.

这是因为 publish() 创建了一个 ConnectableObservable,它不会开始发出项目,直到通过 connect() 被告知这样做方法.在这种情况下,如果您这样做:

This is because the publish() creates a ConnectableObservable which does not start emitting items until told to do so via the connect() method. in which case if you do this:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    ConnectableObservable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation())
                    .observeOn(single)
                    .publish();

    dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + (i - l)));

    Thread.sleep(1000);
    dataStream.connect();
    Thread.sleep(1000);

}

您会注意到,在第一秒(第一个 Thread.sleep() 调用)没有任何反应,就在 dataStream.connect() 被称为排放之后发生.

You will notice that for the first second(the first Thread.sleep() invocation) nothing happens and just after the dataStream.connect() is called the emissions happen.

refCount() 接收一个 ConnectableObservable 并通过计算当前订阅的订阅者数量来向订阅者隐藏调用 connect() 的需要.它所做的是在第一次订阅时调用 connect() 并且在最后一次取消订阅后取消订阅原始 observable.

refCount() takes in a ConnectableObservable and hides from subscribers the need to call connect() by counting how many subscribers are currently subscribed. What it does is upon first subscription it calls connect() and after last unsubscription is unsubscribes from the original observable.

至于publish().autoConnect()的相互取消,之后你确实得到了一个observable但是它有一个特殊的属性,比如说原来的observable在互联网上做了一个API调用(持续 10 秒),当您在没有 share() 的情况下使用它时,您最终会向服务器发送与这 10 秒内订阅的数量一样多的并行查询.另一方面,使用 share() 你将只有一个调用.

As to the mutual cancellation of the publish().autoConnect(), afterwards you do get an observable but it has one special property, say that the original observable does an API call over the Internet(lasting 10 second), when you use it without share() you will end up with as many parallel queries to the server as there were subscriptions over those 10 seconds. On the other hand with share() you will have only one call.

如果一个被共享的 observable 完成它的工作非常快(比如 just(1,2,3)),你不会看到它的任何好处.

You will not see any upside of it, if an observable that is shared completes its work very fast (like just(1,2,3)).

autoConnect()/refCount() 为您提供一个中间 observable,您订阅它而不是原始 observable.

autoConnect()/refCount() gives you an intermediate observable to which you subscribe to instead of the original observable.

如果您有兴趣深入阅读本书:Reactive Programming with RxJava

If you are interested dive into this book: Reactive Programming with RxJava

这篇关于RxJava,一个可观察的多个订阅者:publish().autoConnect()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆