无需递归即可对 Observable 结果进行分页 - RxJava [英] Paginate Observable results without recursion - RxJava
本文介绍了无需递归即可对 Observable 结果进行分页 - RxJava的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个非常标准的 API 分页问题,您可以通过一些简单的递归来处理.这是一个虚构的例子:
I've got a pretty standard API pagination problem which you can handle with some simple recursion. Here's a fabricated example:
public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}
private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}
但这显然会创建一个庞大的调用堆栈.我如何强制执行此操作但维护 Observable 流?
But this can obviously create a massive callstack. How can I do this imperatively but maintain the Observable stream?
这是一个命令式阻塞示例:
Here's an imperative blocking example:
public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();
do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());
return results.build();
}
推荐答案
这不是最优雅的解决方案,但您可以使用主题和副作用.看下面的玩具例子
It's not the most elegant of solutions but you can use subjects and side-effects. See the toy example below
import rx.Observable;
import rx.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import rx.subjects.*;
public class Pagination {
static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>();
public static void main(String[] args) throws InterruptedException {
pages.put("default", new ArrayList<String>());
pages.put("2", new ArrayList<String>());
pages.put("3", new ArrayList<String>());
pages.put("4", new ArrayList<String>());
pages.get("default").add("2");
pages.get("default").add("Maths");
pages.get("default").add("Chemistry");
pages.get("2").add("3");
pages.get("2").add("Physics");
pages.get("2").add("Biology");
pages.get("3").add("4");
pages.get("3").add("Art");
pages.get("4").add("");
pages.get("4").add("Geography");
Observable<List<String>> ret = Observable.defer(() ->
{
System.out.println("Building Observable");
ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1);
Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey ->
{
if (!aKey.equals("")) {
return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0)));
} else {
return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted());
}
});
pagecontrol.onNext("default");
return ret2;
});
// Use this if you want to ensure work isn't done again
ret = ret.cache();
ret.subscribe(l -> System.out.println("Sub 1 : " + l));
ret.subscribe(l -> System.out.println("Sub 2 : " + l));
Thread.sleep(2000L);
}
}
编辑并改进.
这篇关于无需递归即可对 Observable 结果进行分页 - RxJava的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文