Rxjava retryWhen 立即调用 [英] Rxjava retryWhen called instantly

查看:30
本文介绍了Rxjava retryWhen 立即调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 rxjava 有一个非常具体的问题或误解,希望有人可以提供帮助.

I'm having a very specific problem or misunderstanding with rxjava that someone hopefully can help with.

我正在运行 rxjava 2.1.5 并具有以下代码片段:

I'm running rxjava 2.1.5 and have the following code snippet:

public static void main(String[] args) {

    final Observable<Object> observable = Observable.create(emitter -> {
        // Code ... 
    });

    observable.subscribeOn(Schedulers.io())
            .retryWhen(error -> {
                System.out.println("retryWhen");
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"));

}

执行后,程序打印:

retryWhen

Process finished with exit code 0

我的问题,但我不明白的是:为什么在订阅 Observable 时立即调用 retryWhen?可观察对象什么都不做.

My question, and what I don't understand is: Why is retryWhen called instantly upon subscribing to an Observable? The observable does nothing.

我想要的是在发射器上调用 onError 时调用 retryWhen .我误解了 rx 的工作原理吗?

What I want is retryWhen to be called when onError is called on the emitter. Am I misunderstanding how rx works?

谢谢!

添加新代码段:

public static void main(String[] args) throws InterruptedException {

    final Observable<Object> observable = Observable.create(emitter -> {
        emitter.onNext("next");
        emitter.onComplete();
    });

    final CountDownLatch latch = new CountDownLatch(1);
    observable.subscribeOn(Schedulers.io())
            .doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
            .retryWhen(error -> {
                System.out.println("retryWhen: " + error.toString());
                return error.retry();
            }).subscribe(next -> System.out.println("subscribeNext"),
                         error -> System.out.println("subscribeError"),
                         () -> latch.countDown());

    latch.await();
}

发射器 onNext 和 complete 被调用.DoOnError 永远不会被调用.输出为:

Emitter onNext and complete is called. DoOnError is never called. Output is:

重试时间:io.reactivex.subjects.SerializedSubject@35fb3008订阅下一个

retryWhen: io.reactivex.subjects.SerializedSubject@35fb3008 subscribeNext

进程结束,退出代码 0

Process finished with exit code 0

推荐答案

retryWhenObserver 订阅时调用提供的函数,这样你就有一个主序列伴随着一个发出 Throwable 主序列失败的序列.您应该将逻辑组合到您在此 Function 中获得的 Observable 上,因此最后,一个 Throwable 将在另一端产生一个值.

retryWhen calls the provided function when an Observer subscribes to it so you have a main sequence accompanied by a sequence that emits the Throwable the main sequence failed with. You should compose a logic onto the Observable you get in this Function so at the end, one Throwable will result in a value on the other end.

Observable.error(new IOException())
    .retryWhen(e -> {
         System.out.println("Setting up retryWhen");
         int[] count = { 0 };
         return e
            .takeWhile(v -> ++count[0] < 3)
            .doOnNext(v -> { System.out.println("Retrying"); });
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

由于 e ->{ } 函数体针对每个单独的订阅者执行,您可以安全地拥有每个订阅者的状态,例如重试计数器.

Since the e -> { } function body is executed for each individual subscriber, you can have a per subscriber state such as retry counter safely.

使用 e ->e.retry() 没有效果,因为输入错误流永远不会调用它的 onError.

Using e -> e.retry() has no effect because the input error flow never gets its onError called.

这篇关于Rxjava retryWhen 立即调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆