Andorid rxJava:如何从缓存中获取数据并同时在后台对其进行更新? [英] Andorid rxJava: how to get data from cache and and the same time update it in the background?
问题描述
我刚刚开始学习Android的 rxJava ,并希望实现常见用例:
I just start learning rxJava for Android and want to implement the common use case:
- 从缓存中请求数据并显示给用户
- 从网络请求数据
- 服务器更新存储中的数据并自动将其显示给用户
传统上最好的方案是使用 CursorLoader 从缓存中获取数据,在单独的线程中运行Web请求,并通过内容提供商将数据保存到磁盘,内容提供商会自动通知侦听器,并 CursorLoader 会自动更新用户界面.
Traditionally on of the best scenarios was use CursorLoader to get data from cache, run web request in the separate thread and save data to the disk via content provider, content provider automatically notify the listener and CursorLoader autoupdate UI.
在rxJava中,我可以通过运行两个不同的Observer来做到这一点,如下面的代码所示,但是我找不到如何将这两个调用组合为一个以达到我的目标的方法.Googling显示了该线程,但是看起来它只是从缓存中获取数据或从Web服务器中获取数据,但不能同时执行
In rxJava I can do it by running two different Observers as you can see in code below, but I don't find the way how to combine this two calls into the one to reach my aim. Googling shows this thread but it looks like it just get data from the cache or data from the web server, but don't do both RxJava and Cached Data
代码段:
@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
return observableGoal.getSavingsGoals()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
return api.getSavingsGoals()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
@Override
public void onCompleted() {
// no op
}
@Override
public void onError(Throwable e) {
Log.e(App.TAG, "Failed to consume cached data");
view.showError();
}
@Override
public void onNext(SavingsGoals savingsGoals) {
Log.d(App.TAG, "Show the next item");
if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
view.showData(savingsGoals.getSavingsGoals());
} else {
view.showError();
}
}
});
model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
@Override
public void onCompleted() {
// no op
}
@Override
public void onError(Throwable e) {
Log.e(App.TAG, "Failed to consume data from the web", e);
view.showError();
}
@Override
public void onNext(SavingsGoals savingsGoals) {
if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
view.showData(savingsGoals.getSavingsGoals());
} else {
view.showError();
}
}
});
此外,当前方法的问题之一是缓存,并且不保证Web数据可以顺序运行.过时的数据有可能是最新的,而从Web覆盖的则是最新的.
Also, the one of issues with current approach is cache and web data are not garranted to be run sequently. It is possible when outdated data will come as latest and override recent from web.
为解决此问题,我实现了带时间戳过滤的Observer合并与合并:它从缓存中获取数据,将其传递给下一个观察者,如果缓存已过期,则向网络发起新的调用-通过时间戳过滤解决线程竞争的情况.但是,这种方法的问题是我无法从此Observable返回缓存数据-我需要等待两个请求完成其工作.
To solve this issue I implemented Observer merge with filtration by timestamp: it get data from cache, pass it to the next observer and if cache is outdated fire new call to the web - case for thread competition solved by the filtration with timestamps. However, the issue with this approach I can not return cache data from this Observable - I need to wait when both requests finish their work.
代码段.
@Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
return observableGoal
.getTimestampedSavingsGoals()
.subscribeOn(Schedulers.io())
.flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
@Override
public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
Log.d(App.FLOW, "getTimestampedSavingsGoals");
return getGoalsFromBothSources()
.filter(filterResponse(cachedData));
}
})
.subscribeOn(AndroidSchedulers.mainThread());
}
private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
return new Func1<Timestamped<SavingsGoals>, Boolean>() {
@Override
public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
return savingsGoals != null
&& cachedData != null
&& cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
&& savingsGoals.getValue().getSavingsGoals().size() != 0;
}
};
}
private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
return Observable.merge(
observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
api.getSavingsGoals()
.timestamp()
.flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
@Override
public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
}
}))
.subscribeOn(Schedulers.io());
}
您知道在一个观察员中执行此操作的方法吗?
Do you know the approach to do this in one Observer?
可能的解决方案:
@Override
public Observable<SavingsGoals> getSavingGoals() {
return api.getSavingsGoals()
.publish(network ->
Observable.mergeDelayError(
observableGoal.getSavingsGoals().takeUntil(network),
network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
@Override
public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
return observableGoal.saveAll(savingsGoals.getSavingsGoals());
}
})
)
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
- 对不起,IDE中的热替换掩盖了该方法的问题:如果网络不可用且缓存线程首先完成,则第一个错误将终止整个合并(由mergeDelayError解决),第二个情况是当缓存为为空并从Web请求返回第一个数据将不会在订阅服务器上返回.如您所见,如我在代码中所示,保存和传统合并后,我的方法返回Observable,可以正确处理这种情况,但是由于某种原因,takeUntil不能处理.问题仍然悬而未决.
推荐答案
第一个问题:您可以使用doOnNext方法从网络结果"中保存结果,看起来像这样
For first question : You can save the result from Network Result by using doOnNext Method, It would looks something like this
public Observable<NetworkResponse> getDataFromNetwork(
final Request request) {
return networkCall.doOnNext(networkResponse -> saveToStorage(networkResponse);
}
现在要结合存储和在线的两个结果,最好的方法是结合发布和合并.我建议您观看此话题.代码看起来像这样
Now to combine the two results from both Storage and Online, the best way is to combine with publish and merge. I recommend watching this talk. The code would look something like this
public Observable<Response> getData(final Request request) {
return dataService.getDataFromNetwork(request)
.publish(networkResponse -> Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)));
}
为什么使用发布并合并您的问题?publish方法使响应可在回调中访问.takeUntil表示您将从存储中获取数据,但是如果由于某种原因在完成访问存储数据之前网络调用已完成,则将停止存储.这样,您可以确保在从存储中获取旧数据之前,即使已完成也总是显示来自网络的新数据.
Why use publish and merge you my ask? publish method makes the response accessible in the callback. takeUntil means that you will take the data from storage but you will stop it IF for some reason, network call is finished before accessing storage data is finished. This way, you can be sure that new data from network is always shown even if it's finished before getting old data from storage.
最后但并非最不重要的,在您的订户OnNext中,只需将项目添加到列表中即可.(list.clear和list.addAll)或类似的功能,或者在您的情况下,view.showData()
The last but not least, in your subscriber OnNext just add the items to the list. (list.clear and list.addAll) Or similar functions or in you case view.showData()
对于当网络出现错误时呼叫中断,请在末尾添加onErrorResumeNext.
For The call getting disrupted when there's an error from network, add onErrorResumeNext at the end.
public Observable<Response> getData(final Request request) {
return dataService.getDataFromNetwork(request)
.publish(networkResponse -> Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)))
.onErrorResumeNext(dataService.getDataFromStorage(request);
}
这篇关于Andorid rxJava:如何从缓存中获取数据并同时在后台对其进行更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!