Paginate Observable结果没有递归 - RxJava [英] Paginate Observable results without recursion - RxJava

查看:151
本文介绍了Paginate Observable结果没有递归 - RxJava的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常标准的API分页问题,​​您可以使用一些简单的递归来处理。这是一个虚构的例子:

I've got a pretty standard API pagination problem which you can handle with some simple recursion. Here's a fabricated example:

public Observable<List<Result>> scan() {
    return scanPage(Optional.empty(), ImmutableList.of());
}

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
    return this.scanner.scan(startKey, LIMIT)
            .flatMap(page -> {
                if (!page.getLastKey().isPresent()) {
                    return Observable.just(results);
                }
                return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
                        .addAll(results)
                        .addAll(page.getResults())
                        .build()
                );
            });
}

但这显然可以创建一个巨大的callstack。我如何强制执行此操作但保持Observable流?

But this can obviously create a massive callstack. How can I do this imperatively but maintain the Observable stream?

这是一个命令式阻塞示例:

Here's an imperative blocking example:

public List<Result> scan() {
    Optional<String> startKey = Optional.empty();
    final ImmutableList.Builder<Result> results = ImmutableList.builder();

    do {
        final Page page = this.scanner.scan(startKey);
        startKey = page.getLastKey();
        results.addAll(page.getResults());
    } while (startKey.isPresent());

    return results.build();
}


推荐答案

这不是最优雅的解决方案,但你可以使用科目和副作用。请参阅下面的玩具示例

It's not the most elegant of solutions but you can use subjects and side-effects. See the toy example below

import rx.Observable;
import rx.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import rx.subjects.*;

public class Pagination {
    static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>();

    public static void main(String[] args) throws InterruptedException {
        pages.put("default", new ArrayList<String>());
        pages.put("2", new ArrayList<String>());
        pages.put("3", new ArrayList<String>());
        pages.put("4", new ArrayList<String>());

        pages.get("default").add("2");
        pages.get("default").add("Maths");
        pages.get("default").add("Chemistry");  

        pages.get("2").add("3");
        pages.get("2").add("Physics");   
        pages.get("2").add("Biology"); 

        pages.get("3").add("4");
        pages.get("3").add("Art");   

        pages.get("4").add("");
        pages.get("4").add("Geography"); 



        Observable<List<String>> ret = Observable.defer(() -> 
        { 
            System.out.println("Building Observable");
            ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1);
            Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey -> 
            {
                if (!aKey.equals("")) {
                    return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0)));
                } else {
                    return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted());
                }
            });
            pagecontrol.onNext("default");
            return ret2;
        });
        // Use this if you want to ensure work isn't done again
        ret = ret.cache();
        ret.subscribe(l -> System.out.println("Sub 1 : " + l));
        ret.subscribe(l -> System.out.println("Sub 2 : " + l));
        Thread.sleep(2000L);
    }
}

通过改进进行编辑。

这篇关于Paginate Observable结果没有递归 - RxJava的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆