使用RxJava缓存网络调用一段时间 [英] Caching network calls using RxJava for some duration

查看:53
本文介绍了使用RxJava缓存网络调用一段时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Retorfit + RxJava2 建立网络,我想将响应缓存30秒.30秒间隔后进行的任何呼叫都应从服务器获取最新结果.我尝试使用 Replay 运算符执行此操作,但是每次我呼叫订阅时,它仍然会进行网络通话.我不是RxJava的专家,所以也许我对使用 Replay 进行缓存这样的理解是错误的.

I am making a network using Retorfit + RxJava2 and I want to cache the response for 30 seconds. Any calls made after 30 seconds interval should get the latest results from server. I tried doing this using Replay operator but it still makes a network call every time I call subscribe. I am not an expert in RxJava so maybe my understanding of using Replay for caching like that is wrong.

public Observable<Name> getName() {
        return retrofitBuilder.getName()
                .subscribeOn(Schedulers.io())
                .replay(30, TimeUnit.SECONDS,Schedulers.io())
                .autoConnect();
    }

我正在这样调用上面的代码:

and I am calling the above code like this:

 service.getName()
        .subscribe(new Consumer<Name>()
            {
                @Override
                public void accept(Name name) throws Exception
                {
                    Log.d("getName", "Name: " + name.toString());
                }
            }
            , new Consumer<Throwable>()
            {
                @Override
                public void accept(Throwable throwable) throws Exception
                {
                    Log.d("getName", throwable.getMessage());
                }
            });

更新:如果我没有清楚解释我的问题,我表示歉意.我想要的是在特定请求上缓存,而不是在 HttpClient 级别上缓存它,该策略将缓存策略应用于通过它发出的所有请求.最后,我想在需要时为不同的请求定义不同的缓存过期时间.并非我的所有请求都需要在短时间内进行缓存.我想知道我是否能做到这一点.

UPDATE: My apology if I didn't explain my question clearly. What I want is caching on a particular request instead of caching it on HttpClient level which applies the caching strategy to all the request being made through it. In the end I would like to define different caching expiration for different request when needed. Not all my request needs caching for small duration. I was wondering if I could do just that.

感谢您的帮助.

推荐答案

您的方法有两个问题:

  1. 正如@drhr所述,您每次调用 service.getName()时都在创建一个新的 Observable ,您正在创建一个 Observable <的新实例./code>,则应保留相同的重播实例,并在每次调用 service.getName()时将其提供给同一实例之外的调用方.
  2. 即使您将返回相同的实例,在30秒内重播 replay ,也会重播源 Observable 在过去30秒内(即缓存过期后)发出的序列时间,您将一无所获,因为您的请求发生在30秒前.这并不意味着 Observable 在此时间段后将自动重新启动.
  1. as @drhr mentioned, you are creating a new Observable each time you call service.getName() you're creating a new instance of Observable, you should keep the same replayed instance and give to the caller outside the same instance each time it calls service.getName().
  2. even if you will return the same instance, replay with 30 seconds, will replay the sequence emitted by the source Observable over the last 30 sec, meaning after cache expiration time, you will get nothing as your request happened more than 30 sec ago. it doesn't mean that the Observable will restart automatically after this period.

为了缓存特定时间段,基本上,您需要在缓存时间段之后使缓存的响应无效,并在此时间段之后执行新请求,这意味着您应该控制您的订阅,并在那里进行.
您可以通过以下方式实现它:

In order to cache for specific period, you basically need to invalidate the cached response after cache period, and perform new request after this period, that's mean you should control your subscribe, and do it there.
You can achieve it with something like that:

public class CachedRequest<T> {

    private final AtomicBoolean expired = new AtomicBoolean(true);
    private final Observable<T> source;
    private final long cacheExpirationInterval;
    private final TimeUnit cacheExpirationUnit;
    private Observable<T> current;

    public CachedRequest(Observable<T> o, long cacheExpirationInterval,
                         TimeUnit cacheExpirationUnit) {
        source = o;
        current = o;
        this.cacheExpirationInterval = cacheExpirationInterval;
        this.cacheExpirationUnit = cacheExpirationUnit;
    }

    private Observable<T> getCachedObservable() {
        return Observable.defer(() -> {
            if (expired.compareAndSet(true, false)) {
                current = source.cache();
                Observable.timer(cacheExpirationInterval, cacheExpirationUnit)                          
                        .subscribe(aLong -> expired.set(true));
            }
            return current;
        });
    }
}

使用defer可以根据缓存过期状态返回正确的 Observable ,因此缓存过期期间发生的每个订阅都将被缓存 Observable (使用 cache())-意味着请求将只执行一次.缓存过期后,其他订阅将触发新请求,并将设置新的计时器以重置缓存过期.

with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.

这篇关于使用RxJava缓存网络调用一段时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆