RxJava 在多个订阅者之间共享一个 Observable 的发射 [英] RxJava share an Observable's emissions between multiple subscribers

查看:29
本文介绍了RxJava 在多个订阅者之间共享一个 Observable 的发射的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题:

我有一个 observable 正在做一些工作,但其他 observable 需要该 observable 的输出才能工作.我尝试多次订阅同一个 observable,但在日志中我看到原始 observable 被多次启动.

I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.

这就是我创建对象的可观察对象:

thats my observable thats create the object:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
            if (mApi == null) {
                //do some work
            }
            subscriber.onNext(mApi);
            subscriber.unsubscribe();
        })

这就是我需要对象的 observable

thats my observable that needs the object

loadApi().flatMap(api -> api....()));

我正在使用

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
                .unsubscribeOn(Schedulers.io()

在所有可观察对象上.

推荐答案

我不确定我是否正确理解了您的问题,但我认为您正在寻找一种方法来在多个订阅者之间共享 observable 的排放.有几种方法可以做到这一点.一方面,您可以像这样使用 Connectable Observable:

I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

或者,您可以使用 PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));    
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

这两个示例都是单线程的,但您可以轻松添加 observeOn 或 subscirbeOn 调用以使其异步.

Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.

这篇关于RxJava 在多个订阅者之间共享一个 Observable 的发射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆