RxJava 2等效于isUnsubscribed [英] RxJava 2 equivalent to isUnsubscribed
问题描述
I've been working through the examples in the book Reactive Programming with RxJava, which is targeted at version 1 not 2. An introduction to infinite streams has the following example (and notes there are better ways to deal with the concurrency):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
但是,在RxJava 2中,传递给create()
方法的lambda表达式的类型为ObservableEmitter
,而它没有isUnsubscribed()
方法.我看过 2.0有什么不同,执行了对存储库的搜索,但是找不到任何这样的方法.
However, in RxJava 2, the lambda expression passed to the create()
method is of type ObservableEmitter
and this doesn't have an isUnsubscribed()
method. I've had a look in What's Different in 2.0 and also performed a search of the repository but can't find any such method.
在2.0中如何实现相同的功能?
How would this same functionality be achieved in 2.0?
经过编辑,以包含以下给出的解决方案(使用Kotlin进行n.b.):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
推荐答案
订阅Observable后,将返回Disposable
.您可以将其保存到本地变量,然后检查disposable.isDisposed()
以查看它是否仍在订阅.
After you subscribe to Observable, Disposable
is returned. You can save it to your local variable and check disposable.isDisposed()
to see if it still subscribing or not.
这篇关于RxJava 2等效于isUnsubscribed的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!