如何等待异步 Observable 完成 [英] How to wait for async Observable to complete
问题描述
我正在尝试使用 rxjava 构建示例.该示例应编排 ReactiveWareService 和 ReactiveReviewService,重新调整 WareAndReview 组合.
I'm trying to build a sample using rxjava. The sample should orchestrate a ReactiveWareService and a ReactiveReviewService retruning a WareAndReview composite.
ReactiveWareService
public Observable<Ware> findWares() {
return Observable.from(wareService.findWares());
}
ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!
public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
try {
List<Review> reviews = reviewService.findReviewsByItem(item);
reviews.forEach(observer::onNext);
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
}));
}
public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();
wareService.findWares()
.map(WareAndReview::new)
.subscribe(wr -> {
wareAndReviews.add(wr);
//Async!!!!
reviewService.findReviewsByItem(wr.getWare().getItem())
.subscribe(wr::addReview,
throwable -> System.out.println("Error while trying to find reviews for " + wr)
);
}
);
//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
return wareAndReviews;
}
鉴于我不想返回 Observable,我该如何等待异步 Observable (findReviewsByItem) 完成?
Given the fact I don't want to return an Observable, how can I wait for async Observable (findReviewsByItem) to complete?
推荐答案
你的大部分例子都可以用标准的 RxJava 操作符重写:
Most of your example can be rewritten with standard RxJava operators that work together well:
public class Example {
Scheduler scheduler = Schedulers.from(executor);
public Observable<Review> findReviewsByItem(final String item) {
return Observable.just(item)
.subscribeOn(scheduler)
.flatMapIterable(reviewService::findReviewsByItem);
}
public List<WareAndReview> findWaresWithReviews() {
return wareService
.findWares()
.map(WareAndReview::new)
.flatMap(wr -> {
return reviewService
.findReviewsByItem(wr.getWare().getItem())
.doOnNext(wr::addReview)
.lastOrDefault(null)
.map(v -> wr);
})
.toList()
.toBlocking()
.first();
}
}
每当您想组合这样的服务时,首先要考虑 flatMap
.你不需要为每个子 Observable 阻塞,但如果真的有必要,只需要在最后使用 toBlocking()
.
Whenever you want to compose services like this, think of flatMap
first. You don't need to block for each sub-Observable but only at the very end with toBlocking()
if really necessary.
这篇关于如何等待异步 Observable 完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!