Android:使用repeatWhen、takeUntil和改造过滤器使用rx-java无限滚动 [英] Android: infinite scroll with rx-java using repeatWhen, takeUntil and filter with retrofit
问题描述
我正在使用带有 RxJava 的 Retrofit 2.2.分页的工作方式是这样的:我得到第一批数据,我必须用相同的参数请求第二批数据,除了 lastUpdated 日期,然后如果我得到空或同一批数据,这意味着有没有更多的项目.我找到了这篇很棒的文章 https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a#.40aeibaja 关于如何做吧.所以我的代码是:
I am using Retrofit 2.2 with RxJava. The pagination works like this: I get the first batch of data, I have to request the second batch of data with the same params except one which is the lastUpdated date and then if I get empty or the same batch of data it means there are no more items. I have found this great article https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a#.40aeibaja on how to do it. So my code is:
private Observable<Integer> syncDataPoints(final String baseUrl, final String apiKey,
final long surveyGroupId) {
final List<ApiDataPoint> lastBatch = new ArrayList<>();
Timber.d("start syncDataPoints");
return loadAndSave(baseUrl, apiKey, surveyGroupId, lastBatch)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Void> observable) {
Timber.d("Calling repeatWhen");
return observable.delay(5, TimeUnit.SECONDS);
}
})
.takeUntil(new Func1<List<ApiDataPoint>, Boolean>() {
@Override
public Boolean call(List<ApiDataPoint> apiDataPoints) {
boolean done = apiDataPoints.isEmpty();
if (done) {
Timber.d("takeUntil : finished");
} else {
Timber.d("takeUntil : will query again");
}
return done;
}
})
.filter(new Func1<List<ApiDataPoint>, Boolean>() {
@Override
public Boolean call(List<ApiDataPoint> apiDataPoints) {
boolean unfiltered = apiDataPoints.isEmpty();
if (unfiltered) {
Timber.d("filtered");
} else {
Timber.d("not filtered");
}
return unfiltered;
}
}).map(new Func1<List<ApiDataPoint>, Integer>() {
@Override
public Integer call(List<ApiDataPoint> apiDataPoints) {
Timber.d("Finished polling server");
return 0;
}
});
}
private Observable<List<ApiDataPoint>> loadAndSave(final String baseUrl, final String apiKey,
final long surveyGroupId, final List<ApiDataPoint> lastBatch) {
return loadNewDataPoints(baseUrl, apiKey, surveyGroupId)
.concatMap(new Func1<ApiLocaleResult, Observable<List<ApiDataPoint>>>() {
@Override
public Observable<List<ApiDataPoint>> call(ApiLocaleResult apiLocaleResult) {
return saveToDataBase(apiLocaleResult, lastBatch);
}
});
}
private Observable<ApiLocaleResult> loadNewDataPoints(final String baseUrl, final String apiKey,
final long surveyGroupId) {
Timber.d("loadNewDataPoints");
return Observable.just(true).concatMap(new Func1<Object, Observable<ApiLocaleResult>>() {
@Override
public Observable<ApiLocaleResult> call(Object o) {
Timber.d("loadNewDataPoints call");
return restApi
.loadNewDataPoints(baseUrl, apiKey, surveyGroupId,
getSyncedTime(surveyGroupId));
}
});
}
如您所见,有趣的方法是 loadNewDataPoint
s,我希望它被调用,直到没有更多数据点.正如你所看到的 Observable.just(true).concatMap
是一个黑客,因为如果我删除这个 concat 映射 restApi.loadNewDataPoints(....)
不会被调用虽然在日志中我可以看到 api 确实被调用了,但是使用相同的旧参数,当然它返回的结果与第一次相同,因此同步停止,saveToDataBase 确实被调用得很好.使用我的 hack 它可以工作,但我想了解为什么它不能以其他方式工作,以及是否有更好的方法来做到这一点.非常感谢!
As you can see the interesting method is loadNewDataPoint
s and I want it to be called until there are no more datapoints. As you can see Observable.just(true).concatMap
is a hack because if I remove this concat map the restApi.loadNewDataPoints(....)
does not get called although in the logs I can see that the api does get called but with the same old params and of course it returns the same results as the first time so syncing stops, saveToDataBase does get called fine. With my hack it works but I want to understand why it does not work the other way and also if there is a better way to do this. Thanks a lot!
推荐答案
因此,我编写了这种 API(称为 Keyset Pagination)并针对它们实现了 Rx 客户端.
So, I've written this kind of APIs (it's called Keyset Pagination) and implemented Rx clients against them.
这是 BehaviorSubjects 有用的情况之一:
This is one of the cases where BehaviorSubjects are useful:
S initialState = null;
BehaviorProcessor<T> subject = BehaviorProcessor.createDefault(initialState);
return subject
.flatMap(state -> getNextElements(state).singleOrError().toFlowable(), Pair::of, 1)
.serialize()
.flatMap(stateValuePair -> {
S state = stateValuePair.getLeft();
R retrievedValue = stateValuePair.getRight();
if(isEmpty(retrievedValue)) {
subject.onComplete();
return Flowable.empty();
} else {
subject.onNext(getNextState(state, retrievedValue));
return Flowable.just(retrievedValue);
}
}
.doOnUnsubscribe(subject::onCompleted)
.map(value -> ...)
这里
getNextElement
根据状态执行网络调用并返回一个具有单个值的响应式流isEmpty
判断返回值是否为空表示元素结束getNextState
将传入的状态与检索到的值结合起来,以确定getNextElement
的下一个状态.
getNextElement
performs the network call based on a state and returns a reactive stream with a single valueisEmpty
determines whether the returned value is empty indicating end of elementsgetNextState
combines the passed-in state with the retrieved value to determine the next state forgetNextElement
.
如果发生错误(它将被传播)并且如果您在结束前取消订阅(查询将被终止),它将正常工作.
It will work correctly if an error occurs (it will be propagated) and if you unsubscribe before the end (queries will get terminated).
当然,在您的特定情况下,这些不需要是单独的方法或复杂类型.
Of course, in your specific case these don't need to be separate methods or complex types.
这篇关于Android:使用repeatWhen、takeUntil和改造过滤器使用rx-java无限滚动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!