RxJava - ConnectableObservable,断开连接和重新连接 [英] RxJava - ConnectableObservable, disconnecting and reconnecting

查看:57
本文介绍了RxJava - ConnectableObservable,断开连接和重新连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从断开连接"中复制示例代码部分 这里.

<块引用>

断开连接

正如我们在 connect 的签名中看到的,这个方法返回一个 Subscription,就像 Observable.subscribe 一样.您可以使用该引用来终止 ConnectableObservable 的订阅.这将阻止事件传播给观察者,但不会从 ConnectableObservable 取消订阅它们.如果您再次调用 connect,ConnectableObservable 将开始新的订阅,而旧的观察者将再次开始接收值.

ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();订阅 s = connectable.connect();connectable.subscribe(i -> System.out.println(i));线程睡眠(1000);System.out.println(关闭连接");s.取消订阅();线程睡眠(1000);System.out.println(重新连接");s = connectable.connect();

输出

<代码>01234关闭连接重新连接012...


使用 RxJava 2.0.8,我有:

 ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();一次性 s = connectable.connect();connectable.subscribe(new Observer() {@覆盖公共无效订阅(一次性d){}@覆盖public void onNext(Long aLong) {Log.d("test", "Num: " + aLong);}@覆盖public void onError(Throwable e) {}@覆盖公共无效 onComplete() {}});尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",关闭连接");s.dispose();尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",正在重新连接...");可连接.connect();

输出

编号:0数量:1数量:2数量:3数量:4关闭连接正在重新连接...

提前致谢....

解决方案

RxJava 似乎没有采用这种行为.工作示例来自 Rx.NET.请参阅 https://github.com/ReactiveX/RxJava/issues/4771>

I am trying to replicate sample code from the "Disconnecting" section here.

Disconnecting

As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

Output

0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...


Using RxJava 2.0.8, I have:

    ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
    Disposable s = connectable.connect();

    connectable.subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(Long aLong) {
            Log.d("test", "Num: " + aLong);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Closing connection");
    s.dispose();

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Reconnecting...");
    connectable.connect();

Output

Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...

Thanks in advance....

解决方案

It seems this behaviour has not been adopted by RxJava. The working example is from Rx.NET. See https://github.com/ReactiveX/RxJava/issues/4771

这篇关于RxJava - ConnectableObservable,断开连接和重新连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆