RxJava 订阅阻塞 observable [英] RxJava subscribe to blocking observable

查看:20
本文介绍了RxJava 订阅阻塞 observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想实现以下目标:

String result = myObservable.toBlocking().first();

即它就像一个普通的函数调用.但是这永远不会发生,因为您需要订阅它,我不知道该怎么做.如果我订阅它,结果将在另一个范围内,并且代码非常难看,因为无论如何我只能像常规 observable 一样获得结果,因此将其变成阻塞 observable 毫无意义.

i.e. it is like a regular function call. However this never happens because you'd need to subscribe to it, which I don't know how to do. If I subscribe to it, the result will be in another scope, and the code is very ugly because I can only get the result like its a regular observable anyway, so there's no point turning it into a blocking observable.

推荐答案

它确实如你所愿:

    Observable<String> myObservable = Observable.just("firstValue", "secondValue");
    String result = myObservable.toBlocking().first();
    System.out.println(result); // ---> "firstValue"

在幕后,调用 BlockingObservable.first() 为您进行订阅:

Under the hood, calling BlockingObservable.first() does the subscription for you:

private T blockForSingle(final Observable<? extends T> observable) {
    final AtomicReference<T> returnItem = new AtomicReference<T>();
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);

    @SuppressWarnings("unchecked")
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            latch.countDown();
        }

        @Override
        public void onError(final Throwable e) {
            returnException.set(e);
            latch.countDown();
        }

        @Override
        public void onNext(final T item) {
            returnItem.set(item);
        }
    });
    BlockingUtils.awaitForComplete(latch, subscription);

    if (returnException.get() != null) {
        Exceptions.propagate(returnException.get());
    }

    return returnItem.get();
}

UPDATE:如果使用 BehaviourSubject 加上 toBlocking() 没有任何意义.考虑到它既是 Observable 又是 Observer 所以在某处,应该调用 myObservable.onNext("value").如果您通过调用 toBlocking() 阻塞线程,除非 myObservable 在调用 onNext() 的其他线程中可用,否则您将被屏蔽了.

UPDATE: If doesn't make any sense to use a BehaviourSubject plus toBlocking(). Have into account that it is both and Observable and an Observer so somewhere, myObservable.onNext("value") should be invoked. If you block the thread by calling toBlocking(), unless myObservable is available in some other thread where onNext() is called, you are gonna get blocked.

例如,这是`BehaviourSubject 的正常使用:

For instance, this is the normal use of a `BehaviourSubject:

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

这篇关于RxJava 订阅阻塞 observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆