RxJava 订阅阻塞 observable [英] RxJava subscribe to blocking observable
问题描述
我想实现以下目标:
String result = myObservable.toBlocking().first();
即它就像一个普通的函数调用.但是这永远不会发生,因为您需要订阅它,我不知道该怎么做.如果我订阅它,结果将在另一个范围内,并且代码非常难看,因为无论如何我只能像常规 observable 一样获得结果,因此将其变成阻塞 observable 毫无意义.
i.e. it is like a regular function call. However this never happens because you'd need to subscribe to it, which I don't know how to do. If I subscribe to it, the result will be in another scope, and the code is very ugly because I can only get the result like its a regular observable anyway, so there's no point turning it into a blocking observable.
推荐答案
它确实如你所愿:
Observable<String> myObservable = Observable.just("firstValue", "secondValue");
String result = myObservable.toBlocking().first();
System.out.println(result); // ---> "firstValue"
在幕后,调用 BlockingObservable.first()
为您进行订阅:
Under the hood, calling BlockingObservable.first()
does the subscription for you:
private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(final Throwable e) {
returnException.set(e);
latch.countDown();
}
@Override
public void onNext(final T item) {
returnItem.set(item);
}
});
BlockingUtils.awaitForComplete(latch, subscription);
if (returnException.get() != null) {
Exceptions.propagate(returnException.get());
}
return returnItem.get();
}
UPDATE:如果使用 BehaviourSubject
加上 toBlocking()
没有任何意义.考虑到它既是 Observable
又是 Observer
所以在某处,应该调用 myObservable.onNext("value")
.如果您通过调用 toBlocking()
阻塞线程,除非 myObservable
在调用 onNext()
的其他线程中可用,否则您将被屏蔽了.
UPDATE: If doesn't make any sense to use a BehaviourSubject
plus toBlocking()
. Have into account that it is both and Observable
and an Observer
so somewhere, myObservable.onNext("value")
should be invoked. If you block the thread by calling toBlocking()
, unless myObservable
is available in some other thread where onNext()
is called, you are gonna get blocked.
例如,这是`BehaviourSubject 的正常使用:
For instance, this is the normal use of a `BehaviourSubject:
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
这篇关于RxJava 订阅阻塞 observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!