具有多个订户的单一可观察 [英] Single Observable with Multiple Subscribers

查看:73
本文介绍了具有多个订户的单一可观察的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Observable<<List<Foo>> getFoo(),它是从改造服务创建的,并在调用 .getFoo()方法,我需要与多个订户共享.但是,调用.share()方法将导致重新执行网络调用.重播操作符也不起作用.我知道一个可能的解决方案可能是.cache(),但我不知道为什么会导致这种行为.

I have an Observable<<List<Foo>> getFoo() that is created from a Retrofit Service and after calling the .getFoo() method, I need to share it with Multiple Subscribers. Calling the .share() method though, it causes the Network Call to be re-executed. Replay Operator does not work either. I know that a potential solution might be .cache(), but I do not know why this behaviour is caused.

// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();

// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
       .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors);
            }
         });

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
         });

subscription1.unsubscribe();
subscription2.unsubscribe();

上面的代码可以重现上述行为.您可以对其进行调试,以查看收到的列表属于不同的MemoryAddress.

The code above can reproduce the aforementioned behaviour. You can debug it and see that the Lists received belong to a different MemoryAddress.

我也将ConnectableObservables视为一种潜在的解决方案,但这要求我随身携带原始的Observable,并且每次要添加新的Subscriber时都调用.connect().

I have also looked at ConnectableObservables as a potential solution, but this requires me carrying the original observable around, and calling .connect() each time I want to add a new Subscriber.

在Retrofit 1.9之前,.share()的这种行为可以正常工作.它停止了Retrofit 2-beta的工作.我还没有使用几个小时前发布的Retrofit 2 Release Edition进行测试.

This kind of behaviour with the .share() was working fine till Retrofit 1.9. It stopped working on Retrofit 2 - beta. I have not yet tested it with the Retrofit 2 Release Version, that was released some hours ago.

2017年1月2日

对于将来的读者,我写了一篇文章这里详细解释此案!

For future readers, I have written an article here explaining more about the case!

推荐答案

您似乎(隐式地)将.share()返回的ConnectedObservable强制转换为普通的Observable.您可能想了解热的和冷的可观察物之间的区别.

You seem to be (implicitly) casting your ConnectedObservable returned by .share() back into a normal Observable. You might want to read up on the difference between hot and cold observables.

尝试

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
   .subscribe(new Subscriber<List<Contributor>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onNext(List<Contributor> contributors) {
        System.out.println(contributors);
    }
});

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
        });

testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();

您不需要每次要新订阅时都调用connect(),只需启动新观察即可.我想您可以使用replay()来确保所有后续订阅者都能制作出所有项目

You don't need to call connect() every time you want a new subscription you only need it to start up the observable. I suppose you could use replay() to make sure all subsequent subscribers get all items produced

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share()
        .replay()

这篇关于具有多个订户的单一可观察的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆