RxJava retryWhen 运算符未按预期工作 [英] RxJava retryWhen operator is not working as expected

查看:46
本文介绍了RxJava retryWhen 运算符未按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有 ListobservableList,其中包含长度为 1 到 10 的随机 CharSequence.

我有这样的Observable:

Observable.from(observableList).flatMap(new Func1>() {@覆盖公共 Observable调用(CharSequence charSequence){如果 (charSequence.length() == 1) {return Observable.error(new RuntimeException("太短"));} 别的 {返回 Observable.just(charSequence);}}}).retryWhen(new Func1, Observable>() {@覆盖公共 Observablecall(final Observable observable) {return observable.flatMap(new Func1>() {@覆盖公共 Observable调用(可扔可扔){if (backoffStrategy.isApplicable(throwable)) {Log.d(MainActivity.class.getSimpleName(), "适当的throwable被抛出!");返回 backoffStrategy.call(observable);}返回可观察的;}});}})

当序列长度为1时,observable抛出异常.在retryWhenFunc1里面,我想检查错误类型(RuntimeException只是一个例子)并选择合适的重试策略.

这是退避策略:

公共类 BaseBackoffStrategy 实现 BackoffStrategy {@覆盖公共 Observablecall(Observable 尝试) {返回尝试.zipWith(Observable.range(1, 3 + 1), new ObservableUtils.RxPair()).flatMap(new Func1, Observable>() {@覆盖公共 Observablecall(Pair ti) {如果(ti.second <= 3){System.out.println(new Date().toGMTString() + " : " + ti.second + " retry");返回 Observable.timer((long) Math.pow(2, ti.second), TimeUnit.SECONDS);} 别的 {返回 Observable.error(ti.first);}}});}@覆盖public boolean isApplicable(Throwable throwable) {返回 RuntimeException.class.isInstance(throwable);}}私有接口 BackoffStrategy 扩展了 Func1

该函数只返回一个Pair 对象:

公共类 ObservableUtils {公共静态类 RxPair<T1,T2>实现 Func2<T1,T2,Pair<T1,T2>{@覆盖公共对<T1,T2>呼叫(T1 t1,T2 t2){返回 Pair.of(t1, t2);}}}

控制台的输出是:

D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:34 GMT:1 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:36 GMT:1 次重试I/System.out:2016 年 10 月 6 日 07:34:36 GMT:2 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:38 GMT:1 次重试I/System.out:2016 年 10 月 6 日 07:34:38 GMT:3 次重试I/System.out:2016 年 10 月 6 日 07:34:38 GMT:2 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:40 GMT:1 次重试I/System.out:2016 年 10 月 6 日 07:34:40 GMT:3 重试I/System.out:2016 年 10 月 6 日 07:34:40 GMT:2 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:40 GMT:1 次重试I/System.out:2016 年 10 月 6 日 07:34:40 GMT:3 重试I/System.out:2016 年 10 月 6 日 07:34:40 GMT:2 次重试D/MainActivity: onError

但我希望 Observable 在指定时间后重试.下一个异常抛出时,时间应该会变长.控制台的输出应如下所示:

D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:34 GMT:1 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:36 GMT:2 次重试D/MainActivity:适当的throwable被抛出!I/System.out:2016 年 10 月 6 日 07:34:38 GMT:3 次重试D/MainActivity:适当的throwable被抛出!D/MainActivity: onError

<块引用>

我的问题是:我在这里做错了什么?我可以从 Func1 调用其他 Func1 吗?

解决方案

  1. 当使用 RxJava 时,最好使用 lamdas 而不是内部类.您的代码变得更具可读性.

  2. 我觉得你这里有误return backoffStrategy.call(observable);

每当 retryWhen 发出异常时,您都会重新创建 zip + timer.这就是为什么你有像

这样的单、双、三等消息

I/System.out:2016 年 10 月 6 日 07:34:40 GMT:1 次重试

  1. 这是糟糕的设计

    私有接口BackoffStrategy扩展 Func1

为什么需要在创建另一个序列的类中实现 rx 接口?

我使用 lambdas 稍微重写了您的代码,并将代码从 BackoffStrategy.call 移动到序列中

BackoffStrategy backoffStrategy = new BaseBackoffStrategy();列表observableList = Arrays.asList("a");Observable.from(observableList).flatMap(charSequence -> {如果 (charSequence.length() == 1) {return Observable.error(new RuntimeException("太短"));} 别的 {返回 Observable.just(charSequence);}}).retryWhen(observable ->可观察的.filter(backoffStrategy::isApplicable).doOnNext(next -> System.out.println("合适的可抛出对象被抛出!")).zipWith(Observable.range(1, 3 + 1), Tuple::new).flatMap(元组 -> {整数尝试 = tuple.getRight();如果(尝试 <= 3){System.out.println(new Date().toGMTString() + " : " + 尝试 + " 重试");返回 Observable.timer((long) Math.pow(2, 尝试), TimeUnit.SECONDS);} 别的 {返回 Observable.error(tuple.getLeft());}})).toBlocking().订阅(下一个 ->System.out.println("下一个:" + next),错误 ->System.out.println("错误:" + 错误),() ->System.out.println("完成"));

输出是

适当的throwable被抛出!2016 年 10 月 7 日 12:24:24 GMT:1 次重试适当的throwable被抛出!2016 年 10 月 7 日 12:24:26 GMT:2 次重试适当的throwable被抛出!2016 年 10 月 7 日 12:24:30 GMT:3 次重试适当的throwable被抛出!错误:java.lang.RuntimeException:太短

Tuple

 私有静态类元组{私人最后T离开;私人最终V权;公共元组(T左,V右){this.left = 左;this.right = 正确;}公共 T getLeft() {向左返回;}公共 V getRight() {返回权;}}

Suppose I have List<CharSequence> observableList, which contains random CharSequence's which have length from 1 to 10.

I have such Observable:

Observable.from(observableList)
        .flatMap(new Func1<CharSequence, Observable<CharSequence>>() {
            @Override
            public Observable<CharSequence> call(CharSequence charSequence) {
                if (charSequence.length() == 1) {
                    return Observable.error(new RuntimeException("Too short"));
                } else {
                    return Observable.just(charSequence);
                }
            }
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(final Observable<? extends Throwable> observable) {
        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                if (backoffStrategy.isApplicable(throwable)) {
                    Log.d(MainActivity.class.getSimpleName(), "Appropriate throwable is thrown!");
                    return backoffStrategy.call(observable);
                }
                return observable;
            }
        });
    }
})

The observable throws exception, when the sequence length is 1. Inside retryWhen's Func1 I would like to check error type (RuntimeException is just an example) and choose appropriate retry strategy.

This is the backoff strategy:

public class BaseBackoffStrategy implements BackoffStrategy {

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts.zipWith(Observable.range(1, 3 + 1), new ObservableUtils.RxPair<Throwable, Integer>())
                .flatMap(new Func1<Pair<Throwable, Integer>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Pair<Throwable, Integer> ti) {
                        if (ti.second <= 3) {
                            System.out.println(new Date().toGMTString() + " : " + ti.second + " retry");
                            return Observable.timer((long) Math.pow(2, ti.second), TimeUnit.SECONDS);
                        } else {
                            return Observable.error(ti.first);
                        }
                    }
                });
    }

    @Override
    public boolean isApplicable(Throwable throwable) {
        return RuntimeException.class.isInstance(throwable);
    }
}

private interface BackoffStrategy extends Func1<Observable<? extends Throwable>, Observable<?>> {
    boolean isApplicable(Throwable throwable);

}

And the function just returning a Pair of objects:

public class ObservableUtils {

    public static class RxPair<T1, T2> implements Func2<T1, T2, Pair<T1, T2>> {
        @Override
        public Pair<T1, T2> call(T1 t1, T2 t2) {
            return Pair.of(t1, t2);
        }
    }
}

The ouput from the console is:

D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:34 GMT : 1 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:36 GMT : 1 retry
I/System.out: 6 Oct 2016 07:34:36 GMT : 2 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:38 GMT : 1 retry
I/System.out: 6 Oct 2016 07:34:38 GMT : 3 retry
I/System.out: 6 Oct 2016 07:34:38 GMT : 2 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:40 GMT : 1 retry
I/System.out: 6 Oct 2016 07:34:40 GMT : 3 retry
I/System.out: 6 Oct 2016 07:34:40 GMT : 2 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:40 GMT : 1 retry
I/System.out: 6 Oct 2016 07:34:40 GMT : 3 retry
I/System.out: 6 Oct 2016 07:34:40 GMT : 2 retry
D/MainActivity: onError

But I would like the Observable to retry after the specified time. The time should become longer, when the next exception is thrown. The output from console should look like:

D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:34 GMT : 1 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:36 GMT : 2 retry
D/MainActivity: Appropriate throwable is thrown!
I/System.out: 6 Oct 2016 07:34:38 GMT : 3 retry
D/MainActivity: Appropriate throwable is thrown!
D/MainActivity: onError

My question is: what am I doing wrong here? Can I call other Func1 from Func1?

解决方案

  1. When using RxJava it is better use lamdas rather inner classes. You code become more readable.

  2. I think you have a mistake here return backoffStrategy.call(observable);

Every time when retryWhen emits an exception you recreate zip + timer. That's why you have single, double, triple and etc. messages like

I/System.out: 6 Oct 2016 07:34:40 GMT : 1 retry

  1. This is bad design

    private interface BackoffStrategy 
        extends Func1<Observable<? extends Throwable>, Observable<?>> {
    

Why do you need implement rx interfaces in the class which just creates another sequence?

I rewrite your code a little bit using lambdas and move code from BackoffStrategy.call into sequence

BackoffStrategy backoffStrategy = new BaseBackoffStrategy();
List<CharSequence> observableList = Arrays.asList("a");

Observable.from(observableList)
        .flatMap(charSequence -> {
            if (charSequence.length() == 1) {
                return Observable.error(new RuntimeException("Too short"));
            } else {
                return Observable.just(charSequence);
            }
        })
        .retryWhen(observable ->
                observable
                        .filter(backoffStrategy::isApplicable)
                        .doOnNext(next -> System.out.println("Appropriate throwable is thrown!"))
                        .zipWith(Observable.range(1, 3 + 1), Tuple::new)
                        .flatMap(tuple -> {
                            Integer attempts = tuple.getRight();
                            if (attempts <= 3) {
                                System.out.println(new Date().toGMTString() + " : " + attempts + " retry");
                                return Observable.timer((long) Math.pow(2, attempts), TimeUnit.SECONDS);
                            } else {
                                return Observable.error(tuple.getLeft());
                            }
                        })
        )
        .toBlocking()
        .subscribe(
                next -> System.out.println("Next: " + next),
                error -> System.out.println("Error: " + error),
                () -> System.out.println("Completed")
        );

The output is

Appropriate throwable is thrown!
7 Oct 2016 12:24:24 GMT : 1 retry
Appropriate throwable is thrown!
7 Oct 2016 12:24:26 GMT : 2 retry
Appropriate throwable is thrown!
7 Oct 2016 12:24:30 GMT : 3 retry
Appropriate throwable is thrown!
Error: java.lang.RuntimeException: Too short

The Tuple class

   private static class Tuple<T, V> {
        private final T left;
        private final V right;

        public Tuple(T left, V right) {
            this.left = left;
            this.right = right;
        }

        public T getLeft() {
            return left;
        }

        public V getRight() {
            return right;
        }
    }

这篇关于RxJava retryWhen 运算符未按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆