Andorid rxJava:如何从缓存中获取数据并同时在后台对其进行更新? [英] Andorid rxJava: how to get data from cache and and the same time update it in the background?

查看:66
本文介绍了Andorid rxJava:如何从缓存中获取数据并同时在后台对其进行更新?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始学习Android的 rxJava ,并希望实现常见用例:

I just start learning rxJava for Android and want to implement the common use case:

  • 从缓存中请求数据并显示给用户
  • 从网络请求数据
  • 服务器更新存储中的数据并自动将其显示给用户

传统上最好的方案是使用 CursorLoader 从缓存中获取数据,在单独的线程中运行Web请求,并通过内容提供商将数据保存到磁盘,内容提供商会自动通知侦听器,并 CursorLoader 会自动更新用户界面.

Traditionally on of the best scenarios was use CursorLoader to get data from cache, run web request in the separate thread and save data to the disk via content provider, content provider automatically notify the listener and CursorLoader autoupdate UI.

在rxJava中,我可以通过运行两个不同的Observer来做到这一点,如下面的代码所示,但是我找不到如何将这两个调用组合为一个以达到我的目标的方法.Googling显示了该线程,但是看起来它只是从缓存中获取数据或从Web服务器中获取数据,但不能同时执行

In rxJava I can do it by running two different Observers as you can see in code below, but I don't find the way how to combine this two calls into the one to reach my aim. Googling shows this thread but it looks like it just get data from the cache or data from the web server, but don't do both RxJava and Cached Data

代码段:

@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
    return observableGoal.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
    return api.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

    model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume cached data");
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            Log.d(App.TAG, "Show the next item");
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

    model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume data from the web", e);
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

此外,当前方法的问题之一是缓存,并且不保证Web数据可以顺序运行.过时的数据有可能是最新的,而从Web覆盖的则是最新的.

Also, the one of issues with current approach is cache and web data are not garranted to be run sequently. It is possible when outdated data will come as latest and override recent from web.

为解决此问题,我实现了带时间戳过滤的Observer合并与合并:它从缓存中获取数据,将其传递给下一个观察者,如果缓存已过期,则向网络发起新的调用-通过时间戳过滤解决线程竞争的情况.但是,这种方法的问题是我无法从此Observable返回缓存数据-我需要等待两个请求完成其工作.

To solve this issue I implemented Observer merge with filtration by timestamp: it get data from cache, pass it to the next observer and if cache is outdated fire new call to the web - case for thread competition solved by the filtration with timestamps. However, the issue with this approach I can not return cache data from this Observable - I need to wait when both requests finish their work.

代码段.

    @Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
    return observableGoal
            .getTimestampedSavingsGoals()
            .subscribeOn(Schedulers.io())
            .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                @Override
                public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
                    Log.d(App.FLOW, "getTimestampedSavingsGoals");
                    return getGoalsFromBothSources()
                            .filter(filterResponse(cachedData));
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread());
}

private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
    return new Func1<Timestamped<SavingsGoals>, Boolean>() {
        @Override
        public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
            return savingsGoals != null
                    && cachedData != null
                    && cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
                    && savingsGoals.getValue().getSavingsGoals().size() != 0;
        }
    };
}

private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
    Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
    return Observable.merge(
            observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
            api.getSavingsGoals()
                    .timestamp()
                    .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                        @Override
                        public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
                            Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
                            return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
                        }
                    }))
                    .subscribeOn(Schedulers.io());
}

您知道在一个观察员中执行此操作的方法吗?

Do you know the approach to do this in one Observer?

可能的解决方案:

@Override
public Observable<SavingsGoals> getSavingGoals() {
    return api.getSavingsGoals()
            .publish(network ->
                    Observable.mergeDelayError(
                            observableGoal.getSavingsGoals().takeUntil(network),
                            network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
                                @Override
                                public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
                                    return observableGoal.saveAll(savingsGoals.getSavingsGoals());
                                }
                            })
                    )
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆