在错误情况下如何组合多个RxJava链非阻塞 [英] How to combine multiple RxJava chains non-blocking in error case

查看:65
本文介绍了在错误情况下如何组合多个RxJava链非阻塞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的要求:

  • N个并行的翻新呼叫
  • 等待所有呼叫结束(成功或失败)
  • 如果k个(0 <== k null或错误对象,而成功的调用可以返回ResponseBody对象.
  • 可选:如果RxJava有一种简单的方法来区分每次成功或失败的调用,那么很好,如果不是,我将解析响应并自己弄清楚
  • N Retrofit calls in parallel
  • Wait for all calls to finish (success or failure)
  • If k (0<= k < N) calls fail, they should NOT block the others. Imagine failed calls can return null or error objects while successful ones return the ResponseBody objects.
  • Optional: if RxJava has an easy way to distinguish which call is which for each success or failure, great, if not, I'll parse the responses and figure it out myself

我所拥有的:

        Observable<ResponseBody> api1Call = api1.fetchData();
        Observable<ResponseBody> api2Call = api2.fetchData();
        Observable<ResponseBody> api3Call = api3.fetchData();

        Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
            @Override
            public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
                Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
                return null;
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                Logger.e(throwable, "some error with one of the apis?");
                return Observable.empty();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onCompleted() {
                        Logger.i("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Logger.e(e, "onError");
                    }

                    @Override
                    public void onNext(Object o) {
                        Logger.i("onNext " + o);
                    }
                });

我得到的输出:

some error with one of the apis?
// stacktrace of the error
onCompleted

我是RxJava的新手,非常困惑.我在StackOverflow上找到了一些答案,说zip做类似的事情,但是离我的要求还差得远.我猜组合"运算符之一+适当的异常处理将满足我的需求.到目前为止,真的很难弄清楚

I'm new to RxJava and very confused. I found some answers on StackOverflow saying zip does similar thing but it's even further from my requirements. I'm guessing one of the "combine" operators + proper exception handing will do what I need. It's just been really hard to figure it out so far

我正在使用的版本:

compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'

推荐答案

感谢@TinTran和

Thanks to @TinTran and this, here is the correct solution:
(I can't put up the exact syntax for Retrofit Observables now but that shouldn't matter, logic remains the same Retrofit or not)

Observable.mergeDelayError(getData1(), getData2()).doAfterTerminate(new Action0() {
            @Override
            public void call() {
                Logger.i("end of all streams");
                tvTheText.setText("all streams finished");
            }
        }).subscribe(new PrintSubscriber<>("merge" +
                " delay w error"));

可观察对象(改造对象应以相同的方式工作):

The observables (Retrofit ones should work the same way):

private Observable<String> getData1() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> singleSubscriber) {
                try {
                    long responseTime = 120 + new Random().nextInt(30);
                    Thread.sleep(responseTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                singleSubscriber.onNext("data 1");
                singleSubscriber.onCompleted();
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
    }

    private Observable<String> getData2() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> singleSubscriber) {
                try {
                    long responseTime = 100 + new Random().nextInt(19);
                    Thread.sleep(responseTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                singleSubscriber.onError(new Exception());// this one never blocks the other Observables' streams
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
    }

输出日志:

    10-24 15:27:23.335 D: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.335 D: │ Thread: main
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.335 D: │ SafeSubscriber.onNext  (SafeSubscriber.java:134)
10-24 15:27:23.335 D: │    PrintSubscriber.onNext  (PrintSubscriber.java:32)
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.336 D: │ merge delay w error - onNext - data 1
10-24 15:27:23.336 D: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.342 V: ⇢ onError(e=java.lang.Exception)
10-24 15:27:23.342 V: ⇠ onError [0ms]
10-24 15:27:23.343 I: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.343 I: │ Thread: main
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.343 I: │ OperatorDoAfterTerminate$1.callAction  (OperatorDoAfterTerminate.java:73)
10-24 15:27:23.343 I: │    MainActivity$1.call  (MainActivity.java:37)
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.344 I: │ end of all streams
10-24 15:27:23.344 I: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────

这篇关于在错误情况下如何组合多个RxJava链非阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆