RxJava 分页 [英] RxJava Pagination

查看:34
本文介绍了RxJava 分页的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 rxJava 的新手.在我的 api 响应中,我获得了有关总页数和当前页码的信息,例如:

I am fairly new to rxJava. in my api response, I get information about total number of pages and current page number like:

 "pages": 22,
 "page": 1,

我正在使用 Retrofit 在数据层进行 api 调用,我的 api 服务是这样的:

i am using Retrofit to do api calls in data layer, my api service is like:

@GET("/story")
    Observable <StoryCollectionEntity> storyCollection(
              @Query("feed_ids") String feed_items, 
              @Query("page") int  page);

然后:

   public Observable<StoryCollectionEntity> storyCollection() {
        return mUserApi.storyCollection(items,page);
}

我在域层做了这样的订阅:

i did the subscription in domain layer like this:

public void execute(Subscriber UseCaseSubscriber) {
        this.subscription = this.buildUseCaseObservable()
                 .subscribeOn(Schedulers.from(threadExecutor))
                .observeOn(postExecutionThread.getScheduler())
                .subscribe(UseCaseSubscriber);

    }
 @Override public Observable buildUseCaseObservable() {
    return this.userRepository.stories();
  }

我正在弄清楚如何通过发出下一页的结果让这个观察者对 recyclerView 滚动事件做出反应.即第 2 页到第二个滚动事件和第 3 页在第 3 次滚动...等

i am figuring out on how can i make this observer reacts to recyclerView scrolling event by emitting the result of next page. i.e page 2 to second scrolling event and page 3 on 3rd scrolling ...etc

推荐答案

我将粘贴在我正在使用的解决方案下方——它也使用了 Retrofit.在下面的示例中,我使用 Retrofit 从 GitHub API 创建所有公共存储库的 Observable.

I'll paste below the solution I'm using - which also uses Retrofit. In the example below, I'm using Retrofit to create an Observable of all public repositories from the GitHub API.

我相信技巧是我在 Observable 上递归并且连接与另一个 Observable 本质上是一个触发器.

I believe the trick here is that I am recursing on the Observable and concatenating with another Observable is essentially a trigger.

这个触发器可以通过多种方式发出(也许当用户滚动到页面底部时),但是在这个例子中,它会在处理完整个页面后立即发出.

This trigger can emit a variety of ways (perhaps when the user scrolls to the bottom of the page) however in this example, it immediately emits once a full page has been handled.

/***
 * This is a helper wrapper Subscriber that helps you lazily defer
 * continuous paging of a result set from some API.
 * Through the use of a {@link Subject}, it helps notify the original {@link Observable}
 * when to perform an additional fetch.
 * The notification is sent when a certain count of items has been reached.
 * Generally this count represents the page.
 * @param <T> The event type
 */
@Data
public class PagingSubscriber<T> extends Subscriber<T> {

    private final Subject<Void,Void> nextPageTrigger = PublishSubject.create();
    private final long pageSize;
    private long count = 0;
    private final Subscriber<T> delegate;

    /***
     * Creates the {@link PagingSubscriber}
     * @param pageSize
     * @param delegate
     */
    public PagingSubscriber(long pageSize, Subscriber<T> delegate) {
        this.pageSize = pageSize;
        this.delegate = delegate;
    }

    public Observable<Void> getNextPageTrigger() {
        return nextPageTrigger;
    }

    @Override
    public void onStart() {
        delegate.onStart();
    }

    @Override
    public void onCompleted() {
        delegate.onCompleted();
    }

    @Override
    public void onError(Throwable e) {
        delegate.onError(e);
    }

    @Override
    public void onNext(T t) {
        count+=1;
        if (count == pageSize) {
            nextPageTrigger.onNext(null);
            count= 0;
        }
        delegate.onNext(t);
    }

}

@Data
public class GitHubRepositoryApplication {

    private final GitHubService gitHubService;

    @Inject
    public GitHubRepositoryApplication(GitHubService githubService) {
        this.gitHubService = githubService;
    }

    public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) {
        return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger)
                .flatMapIterable(r -> r.body());
    }


    public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) {
        return gitHubService.listRepos(startingPage)
                .concatMap(response -> {
                    Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK))
                            .flatMap(header -> GitHubServiceUtils.getNextToken(header));

                    if (!nextPage.isPresent()) {
                        return Observable.just(response);
                    }
                    return Observable.just(response)
                            .concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class))
                            .concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger));
                });
    }

    public static void main(String[] args) {
        Injector injector = Guice.createInjector(new GitHubModule());

        GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class);

        Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() {

            private final Logger log = LoggerFactory.getLogger(getClass());

            @Override
            public void onStart() {
                log.debug("STARTING");
                request(1l);//we need to begin the request
            }

            @Override
            public void onCompleted() {
                log.debug("COMPLETED");
            }

            @Override
            public void onError(Throwable e) {
                log.error("ERROR",e);
            }

            @Override
            public void onNext(GitHubRepository gitHubRepository) {
                log.debug("{}",gitHubRepository);
                request(1l);//we need to make sure we have asked for another element
            }
        };

        PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber);

        //In order for the JVM not to quit out, we make sure we turn our Observable to
        //a BlockingObservable, so that all of it will finish.
        Observable<GitHubRepository> observable =
                app.printAllRepositories(pagingSubscriber.getNextPageTrigger());
        observable.toBlocking().subscribe(pagingSubscriber);

    }

}

这篇关于RxJava 分页的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆