Rxjava retryWhen 立即调用 [英] Rxjava retryWhen called instantly
问题描述
我对 rxjava 有一个非常具体的问题或误解,希望有人可以提供帮助.
I'm having a very specific problem or misunderstanding with rxjava that someone hopefully can help with.
我正在运行 rxjava 2.1.5 并具有以下代码片段:
I'm running rxjava 2.1.5 and have the following code snippet:
public static void main(String[] args) {
final Observable<Object> observable = Observable.create(emitter -> {
// Code ...
});
observable.subscribeOn(Schedulers.io())
.retryWhen(error -> {
System.out.println("retryWhen");
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"));
}
执行后,程序打印:
retryWhen
Process finished with exit code 0
我的问题,但我不明白的是:为什么在订阅 Observable 时立即调用 retryWhen?可观察对象什么都不做.
My question, and what I don't understand is: Why is retryWhen called instantly upon subscribing to an Observable? The observable does nothing.
我想要的是在发射器上调用 onError 时调用 retryWhen .我误解了 rx 的工作原理吗?
What I want is retryWhen to be called when onError is called on the emitter. Am I misunderstanding how rx works?
谢谢!
添加新代码段:
public static void main(String[] args) throws InterruptedException {
final Observable<Object> observable = Observable.create(emitter -> {
emitter.onNext("next");
emitter.onComplete();
});
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribeOn(Schedulers.io())
.doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
.retryWhen(error -> {
System.out.println("retryWhen: " + error.toString());
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"),
() -> latch.countDown());
latch.await();
}
发射器 onNext 和 complete 被调用.DoOnError 永远不会被调用.输出为:
Emitter onNext and complete is called. DoOnError is never called. Output is:
重试时间:io.reactivex.subjects.SerializedSubject@35fb3008订阅下一个
retryWhen: io.reactivex.subjects.SerializedSubject@35fb3008 subscribeNext
进程结束,退出代码 0
Process finished with exit code 0
推荐答案
retryWhen
在 Observer
订阅时调用提供的函数,这样你就有一个主序列伴随着一个发出 Throwable
主序列失败的序列.您应该将逻辑组合到您在此 Function
中获得的 Observable
上,因此最后,一个 Throwable
将在另一端产生一个值.
retryWhen
calls the provided function when an Observer
subscribes to it so you have a main sequence accompanied by a sequence that emits the Throwable
the main sequence failed with. You should compose a logic onto the Observable
you get in this Function
so at the end, one Throwable
will result in a value on the other end.
Observable.error(new IOException())
.retryWhen(e -> {
System.out.println("Setting up retryWhen");
int[] count = { 0 };
return e
.takeWhile(v -> ++count[0] < 3)
.doOnNext(v -> { System.out.println("Retrying"); });
})
.subscribe(System.out::println, Throwable::printStackTrace);
由于 e ->{ }
函数体针对每个单独的订阅者执行,您可以安全地拥有每个订阅者的状态,例如重试计数器.
Since the e -> { }
function body is executed for each individual subscriber, you can have a per subscriber state such as retry counter safely.
使用 e ->e.retry()
没有效果,因为输入错误流永远不会调用它的 onError
.
Using e -> e.retry()
has no effect because the input error flow never gets its onError
called.
这篇关于Rxjava retryWhen 立即调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!