RxJava;如何同步发出可观察量 [英] RxJava; How to emit observables synchronously

查看:133
本文介绍了RxJava;如何同步发出可观察量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想同步发出两个Observable对象(它们是异步的),一个接一个地返回第一个发出的Observable对象。如果第一个失败,它不应该发出第二个。

I want to synchronously emit two Observable objects (which are asynchronous), one after the other where it returns the first emitted Observable object. If the first one fails, it should not emit the second one.

假设我们有一个Observable签署用户,另一个Observable自动选择用户的帐户,登录后。

Let's say we have one Observable that signs a user in, and another Observable that automatically selects the user's account, after signing in.

这就是我的尝试:

public Observable<AccessToken> signInAndSelectAccount(String username, String password)
{

    Observable<AccessToken> ob1 = ...; // Sign in.
    Observable<Account> ob2 = ...; // Select account.


    return Observable.zip(
            ob1,
            ob2,
            new Func2<AccessToken, Account, AccessToken>() {
                @Override
                public AccessToken call(AccessToken accessToken, Account account)
                {
                     return accessToken;
                }
            });
}

遗憾的是,这不适用于我的用例。它将以ob1开头并行发出/调用两个observable。

This unfortunately does not work for my use case. It will emit/call both observables parallel, starting with 'ob1'.

有人遇到类似的用例吗?或者想知道如何让observables以同步的方式等待彼此,可以返回第一个发出的?

Did someone encounter a similar use case? Or has an idea on how to make observables wait for eachother in a synchronous way, where the first emitted can be returned?

提前致谢。

推荐答案

在反应式编程中没有等待这样的术语。您需要考虑创建数据流,其中一个 Observable 可以由另一个触发。在收到令牌后,您需要收到帐户。它可能如下所示:

There is no such term as "wait" in reactive programming. You need to think about creating of a data stream, where one Observable could be triggered by another. In your case after receiving token you need to receive account. It could look like this:

Observable<Account> accountObservable = Observable.create(new Observable.OnSubscribe<AccessToken>() {
    @Override public void call(Subscriber<? super AccessToken> subscriber) {
        subscriber.onNext(new AccessToken());
        subscriber.onCompleted();
    }
}).flatMap(accessToken -> Observable.create(new Observable.OnSubscribe<Account>() {
    @Override public void call(Subscriber<? super Account> subscriber) {
        subscriber.onNext(new Account(accessToken));
        subscriber.onCompleted();
    }
}));

这篇关于RxJava;如何同步发出可观察量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆