可观察,仅在完成时重试错误和缓存 [英] Observable, retry on error and cache only if completed

查看:32
本文介绍了可观察,仅在完成时重试错误和缓存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们可以使用 cache() 操作符来避免多次执行长任务(http 请求),并重用其结果:

we can use the cache() operator to avoid executing a long task (http request) multiple times, and reuse its result:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache()

---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
               .subscribe(subscriberB);

第一次,http 请求被执行,但是第二次,由于我们使用了 cache() 操作符,请求不会被执行,但我们将能够重用第一个结果.

The first time, the http request is executed, but the second time, since we used the cache() operator, the request won't be executed but we'll be able to reuse the first result.

当第一个请求成功完成时,这工作正常.但是如果 onError 在第一次尝试中被调用,那么下次新订阅者订阅同一个 observable 时,onError 将再次被调用,而不会再次尝试 http 请求.

This works fine when the first request completes successfully. But if onError is called in the first attempt, then the next time that a new subscriber subscribes to the same observable, the onError will be called again without attempting the http request again.

我们试图做的是,如果第一次调用 onError ,那么下次有人订阅同一个 observable 时,将从头开始尝试 http 请求.即 observable 将只缓存成功的 api 调用,即调用 onCompleted 的那些调用.

What we are trying to do is, that if onError is called the first time, then the next time that someone subscribes to the same observable, the http request will be attempted from scratch. ie the observable will cache only the successful api calls, ie those for which the onCompleted was called.

关于如何进行的任何想法?我们尝试过使用 retry() 和 cache() 运算符,但运气不佳.

Any ideas about how to proceed? We've tried using the retry() and cache() operators with no much luck.

推荐答案

这是我们在扩展 akarnokd 的解决方案后得到的解决方案:

This is the solution we ended up with, after extending akarnokd's solution:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

我们需要缓存来自 Retrofit 的 http 请求的结果.所以这是创建的,考虑到一个发出单个项目的 observable.

We needed to cache the result of an http request from Retrofit. So this was created, with an observable that emits a single item in mind.

如果在执行 http 请求时观察者订阅了,我们希望它等待并且不要执行请求两次,除非正在进行的请求失败.为此,信号量允许对创建或返回缓存的 observable 的块进行单次访问,​​如果创建了新的 observable,我们会等待该 observable 终止.可以在此处

If an observer subscribed while the http request was being executed, we wanted it to wait and not execute the request twice, unless the in-progress one failed. To do that the semaphore allows single access to the block that creates or returns the cached observable, and if a new observable is created, we wait until that one terminates. Tests for the above can be found here

这篇关于可观察,仅在完成时重试错误和缓存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆