具有多个订户的单一可观察 [英] Single Observable with Multiple Subscribers
问题描述
我有一个Observable<<List<Foo>> getFoo()
,它是从改造服务创建的,并在调用
.getFoo()
方法,我需要与多个订户共享.但是,调用.share()
方法将导致重新执行网络调用.重播操作符也不起作用.我知道一个可能的解决方案可能是.cache()
,但我不知道为什么会导致这种行为.
I have an Observable<<List<Foo>> getFoo()
that is created from a Retrofit Service and after calling the
.getFoo()
method, I need to share it with Multiple Subscribers. Calling the .share()
method though, it causes the Network Call to be re-executed. Replay Operator does not work either. I know that a potential solution might be .cache()
, but I do not know why this behaviour is caused.
// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(API_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
subscription1.unsubscribe();
subscription2.unsubscribe();
上面的代码可以重现上述行为.您可以对其进行调试,以查看收到的列表属于不同的MemoryAddress.
The code above can reproduce the aforementioned behaviour. You can debug it and see that the Lists received belong to a different MemoryAddress.
我也将ConnectableObservables视为一种潜在的解决方案,但这要求我随身携带原始的Observable,并且每次要添加新的Subscriber时都调用.connect()
.
I have also looked at ConnectableObservables as a potential solution, but this requires me carrying the original observable around, and calling .connect()
each time I want to add a new Subscriber.
在Retrofit 1.9之前,.share()
的这种行为可以正常工作.它停止了Retrofit 2-beta的工作.我还没有使用几个小时前发布的Retrofit 2 Release Edition进行测试.
This kind of behaviour with the .share()
was working fine till Retrofit 1.9. It stopped working on Retrofit 2 - beta. I have not yet tested it with the Retrofit 2 Release Version, that was released some hours ago.
2017年1月2日
对于将来的读者,我写了一篇文章这里详细解释此案!
For future readers, I have written an article here explaining more about the case!
推荐答案
您似乎(隐式地)将.share()
返回的ConnectedObservable
强制转换为普通的Observable
.您可能想了解热的和冷的可观察物之间的区别.
You seem to be (implicitly) casting your ConnectedObservable
returned by .share()
back into a normal Observable
. You might want to read up on the difference between hot and cold observables.
尝试
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();
您不需要每次要新订阅时都调用connect()
,只需启动新观察即可.我想您可以使用replay()
来确保所有后续订阅者都能制作出所有项目
You don't need to call connect()
every time you want a new subscription you only need it to start up the observable. I suppose you could use replay()
to make sure all subsequent subscribers get all items produced
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share()
.replay()
这篇关于具有多个订户的单一可观察的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!