在错误情况下如何组合多个RxJava链非阻塞 [英] How to combine multiple RxJava chains non-blocking in error case
问题描述
我的要求:
- N个并行的翻新呼叫
- 等待所有呼叫结束(成功或失败)
- 如果k个(0 <== k
null或错误对象,而成功的调用可以返回 ResponseBody
对象. - 可选:如果RxJava有一种简单的方法来区分每次成功或失败的调用,那么很好,如果不是,我将解析响应并自己弄清楚
- N Retrofit calls in parallel
- Wait for all calls to finish (success or failure)
- If k (0<= k < N) calls fail, they should NOT block the others. Imagine failed calls can return
null
or error objects while successful ones return theResponseBody
objects. - Optional: if RxJava has an easy way to distinguish which call is which for each success or failure, great, if not, I'll parse the responses and figure it out myself
我所拥有的:
Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();
Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}
@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});
我得到的输出:
some error with one of the apis?
// stacktrace of the error
onCompleted
我是RxJava的新手,非常困惑.我在StackOverflow上找到了一些答案,说zip
做类似的事情,但是离我的要求还差得远.我猜组合"运算符之一+适当的异常处理将满足我的需求.到目前为止,真的很难弄清楚
I'm new to RxJava and very confused. I found some answers on StackOverflow saying zip
does similar thing but it's even further from my requirements. I'm guessing one of the "combine" operators + proper exception handing will do what I need. It's just been really hard to figure it out so far
我正在使用的版本:
compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
推荐答案
Thanks to @TinTran and this, here is the correct solution:
(I can't put up the exact syntax for Retrofit Observables now but that shouldn't matter, logic remains the same Retrofit or not)
Observable.mergeDelayError(getData1(), getData2()).doAfterTerminate(new Action0() {
@Override
public void call() {
Logger.i("end of all streams");
tvTheText.setText("all streams finished");
}
}).subscribe(new PrintSubscriber<>("merge" +
" delay w error"));
可观察对象(改造对象应以相同的方式工作):
The observables (Retrofit ones should work the same way):
private Observable<String> getData1() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> singleSubscriber) {
try {
long responseTime = 120 + new Random().nextInt(30);
Thread.sleep(responseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
singleSubscriber.onNext("data 1");
singleSubscriber.onCompleted();
}
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
}
private Observable<String> getData2() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> singleSubscriber) {
try {
long responseTime = 100 + new Random().nextInt(19);
Thread.sleep(responseTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
singleSubscriber.onError(new Exception());// this one never blocks the other Observables' streams
}
}).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
}
输出日志:
10-24 15:27:23.335 D: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.335 D: │ Thread: main
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.335 D: │ SafeSubscriber.onNext (SafeSubscriber.java:134)
10-24 15:27:23.335 D: │ PrintSubscriber.onNext (PrintSubscriber.java:32)
10-24 15:27:23.335 D: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.336 D: │ merge delay w error - onNext - data 1
10-24 15:27:23.336 D: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.342 V: ⇢ onError(e=java.lang.Exception)
10-24 15:27:23.342 V: ⇠ onError [0ms]
10-24 15:27:23.343 I: ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────
10-24 15:27:23.343 I: │ Thread: main
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.343 I: │ OperatorDoAfterTerminate$1.callAction (OperatorDoAfterTerminate.java:73)
10-24 15:27:23.343 I: │ MainActivity$1.call (MainActivity.java:37)
10-24 15:27:23.343 I: ├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄
10-24 15:27:23.344 I: │ end of all streams
10-24 15:27:23.344 I: └────────────────────────────────────────────────────────────────────────────────────────────────────────────────
这篇关于在错误情况下如何组合多个RxJava链非阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!