Android:使用repeatWhen、takeUntil和改造过滤器使用rx-java无限滚动 [英] Android: infinite scroll with rx-java using repeatWhen, takeUntil and filter with retrofit

查看:74
本文介绍了Android:使用repeatWhen、takeUntil和改造过滤器使用rx-java无限滚动的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用带有 RxJava 的 Retrofit 2.2.分页的工作方式是这样的:我得到第一批数据,我必须用相同的参数请求第二批数据,除了 lastUpdated 日期,然后如果我得到空或同一批数据,这意味着有没有更多的项目.我找到了这篇很棒的文章 https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a#.40aeibaja 关于如何做吧.所以我的代码是:

I am using Retrofit 2.2 with RxJava. The pagination works like this: I get the first batch of data, I have to request the second batch of data with the same params except one which is the lastUpdated date and then if I get empty or the same batch of data it means there are no more items. I have found this great article https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a#.40aeibaja on how to do it. So my code is:

private Observable<Integer> syncDataPoints(final String baseUrl, final String apiKey,
        final long surveyGroupId) {
    final List<ApiDataPoint> lastBatch = new ArrayList<>();
    Timber.d("start syncDataPoints");
    return loadAndSave(baseUrl, apiKey, surveyGroupId, lastBatch)
            .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                @Override
                public Observable<?> call(final Observable<? extends Void> observable) {
                    Timber.d("Calling repeatWhen");
                    return observable.delay(5, TimeUnit.SECONDS);
                }
            })
            .takeUntil(new Func1<List<ApiDataPoint>, Boolean>() {
                @Override
                public Boolean call(List<ApiDataPoint> apiDataPoints) {
                    boolean done = apiDataPoints.isEmpty();
                    if (done) {
                        Timber.d("takeUntil : finished");
                    } else {
                        Timber.d("takeUntil : will query again");
                    }
                    return done;
                }
            })
            .filter(new Func1<List<ApiDataPoint>, Boolean>() {
                @Override
                public Boolean call(List<ApiDataPoint> apiDataPoints) {
                    boolean unfiltered = apiDataPoints.isEmpty();
                    if (unfiltered) {
                        Timber.d("filtered");
                    } else {
                        Timber.d("not filtered");
                    }
                    return unfiltered;
                }
            }).map(new Func1<List<ApiDataPoint>, Integer>() {
                @Override
                public Integer call(List<ApiDataPoint> apiDataPoints) {
                    Timber.d("Finished polling server");
                    return 0;
                }
            });
}

private Observable<List<ApiDataPoint>> loadAndSave(final String baseUrl, final String apiKey,
        final long surveyGroupId, final List<ApiDataPoint> lastBatch) {
    return loadNewDataPoints(baseUrl, apiKey, surveyGroupId)
            .concatMap(new Func1<ApiLocaleResult, Observable<List<ApiDataPoint>>>() {
                @Override
                public Observable<List<ApiDataPoint>> call(ApiLocaleResult apiLocaleResult) {
                    return saveToDataBase(apiLocaleResult, lastBatch);
                }
            });
}


private Observable<ApiLocaleResult> loadNewDataPoints(final String baseUrl, final String apiKey,
        final long surveyGroupId) {
    Timber.d("loadNewDataPoints");

    return Observable.just(true).concatMap(new Func1<Object, Observable<ApiLocaleResult>>() {
        @Override
        public Observable<ApiLocaleResult> call(Object o) {
            Timber.d("loadNewDataPoints call");
            return restApi
                    .loadNewDataPoints(baseUrl, apiKey, surveyGroupId,
                            getSyncedTime(surveyGroupId));
        }
    });
}

如您所见,有趣的方法是 loadNewDataPoints,我希望它被调用,直到没有更多数据点.正如你所看到的 Observable.just(true).concatMap 是一个黑客,因为如果我删除这个 concat 映射 restApi.loadNewDataPoints(....) 不会被调用虽然在日志中我可以看到 api 确实被调用了,但是使用相同的旧参数,当然它返回的结果与第一次相同,因此同步停止,saveToDataBase 确实被调用得很好.使用我的 hack 它可以工作,但我想了解为什么它不能以其他方式工作,以及是否有更好的方法来做到这一点.非常感谢!

As you can see the interesting method is loadNewDataPoints and I want it to be called until there are no more datapoints. As you can see Observable.just(true).concatMap is a hack because if I remove this concat map the restApi.loadNewDataPoints(....) does not get called although in the logs I can see that the api does get called but with the same old params and of course it returns the same results as the first time so syncing stops, saveToDataBase does get called fine. With my hack it works but I want to understand why it does not work the other way and also if there is a better way to do this. Thanks a lot!

推荐答案

因此,我编写了这种 API(称为 Keyset Pagination)并针对它们实现了 Rx 客户端.

So, I've written this kind of APIs (it's called Keyset Pagination) and implemented Rx clients against them.

这是 BehaviorSubjects 有用的情况之一:

This is one of the cases where BehaviorSubjects are useful:

S initialState = null;
BehaviorProcessor<T> subject = BehaviorProcessor.createDefault(initialState);
return subject
  .flatMap(state -> getNextElements(state).singleOrError().toFlowable(), Pair::of, 1)
  .serialize()
  .flatMap(stateValuePair -> {
      S state = stateValuePair.getLeft();
      R retrievedValue = stateValuePair.getRight();
      if(isEmpty(retrievedValue)) {
         subject.onComplete();
         return Flowable.empty();
      } else {
         subject.onNext(getNextState(state, retrievedValue));
         return Flowable.just(retrievedValue);
      }
    }
   .doOnUnsubscribe(subject::onCompleted)
   .map(value -> ...)

这里

  • getNextElement 根据状态执行网络调用并返回一个具有单个值的响应式流
  • isEmpty 判断返回值是否为空表示元素结束
  • getNextState 将传入的状态与检索到的值结合起来,以确定 getNextElement 的下一个状态.
  • getNextElement performs the network call based on a state and returns a reactive stream with a single value
  • isEmpty determines whether the returned value is empty indicating end of elements
  • getNextState combines the passed-in state with the retrieved value to determine the next state for getNextElement.

如果发生错误(它将被传播)并且如果您在结束前取消订阅(查询将被终止),它将正常工作.

It will work correctly if an error occurs (it will be propagated) and if you unsubscribe before the end (queries will get terminated).

当然,在您的特定情况下,这些不需要是单独的方法或复杂类型.

Of course, in your specific case these don't need to be separate methods or complex types.

这篇关于Android:使用repeatWhen、takeUntil和改造过滤器使用rx-java无限滚动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆