RxJava - ConnectableObservable,断开连接和重新连接 [英] RxJava - ConnectableObservable, disconnecting and reconnecting
问题描述
我正在尝试从断开连接"中复制示例代码部分 这里.
<块引用>断开连接
正如我们在 connect 的签名中看到的,这个方法返回一个 Subscription,就像 Observable.subscribe 一样.您可以使用该引用来终止 ConnectableObservable 的订阅.这将阻止事件传播给观察者,但不会从 ConnectableObservable 取消订阅它们.如果您再次调用 connect,ConnectableObservable 将开始新的订阅,而旧的观察者将再次开始接收值.
ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();订阅 s = connectable.connect();connectable.subscribe(i -> System.out.println(i));线程睡眠(1000);System.out.println(关闭连接");s.取消订阅();线程睡眠(1000);System.out.println(重新连接");s = connectable.connect();
输出
<代码>01234关闭连接重新连接012...
使用 RxJava 2.0.8,我有:
ConnectableObservable可连接 = Observable.interval(200, TimeUnit.MILLISECONDS).publish();一次性 s = connectable.connect();connectable.subscribe(new Observer() {@覆盖公共无效订阅(一次性d){}@覆盖public void onNext(Long aLong) {Log.d("test", "Num: " + aLong);}@覆盖public void onError(Throwable e) {}@覆盖公共无效 onComplete() {}});尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",关闭连接");s.dispose();尝试 {线程睡眠(1000);} catch (InterruptedException e) {e.printStackTrace();}Log.d(测试",正在重新连接...");可连接.connect();
输出
编号:0数量:1数量:2数量:3数量:4关闭连接正在重新连接...
提前致谢....
RxJava 似乎没有采用这种行为.工作示例来自 Rx.NET.请参阅 https://github.com/ReactiveX/RxJava/issues/4771>
I am trying to replicate sample code from the "Disconnecting" section here.
Disconnecting
As we saw in connect's signature, this method returns a Subscription, just like Observable.subscribe does. You can use that reference to terminate the ConnectableObservable's subscription. That will stop events from being propagated to observers but it will not unsubscribe them from the ConnectableObservable. If you call connect again, the ConnectableObservable will start a new subscription and the old observers will begin receiving values again.
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
Output
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
Using RxJava 2.0.8, I have:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
Output
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
Thanks in advance....
It seems this behaviour has not been adopted by RxJava. The working example is from Rx.NET. See https://github.com/ReactiveX/RxJava/issues/4771
这篇关于RxJava - ConnectableObservable,断开连接和重新连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!