RxJava:结合两个可选的 observable [英] RxJava: combine two optional observables
问题描述
我有两个 Observable
,我们称它们为 PeanutButter
和 Jelly
.我想将它们组合成一个 Sandwich
Observable
.我可以使用:
I have two Observable
s, let's call them PeanutButter
and Jelly
. I'd like to combine them to a Sandwich
Observable
. I can do that using:
Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
peanutButterObservable,
jellyObservable,
(pb, j) -> makeSandwich(pb, j))
问题是 RX 等待第一个 PeanutButter
和第一个 Jelly
发出,然后才发出第一个组合 Sandwich
但 Jelly
可能永远不会发出,这意味着我永远得到第一个 Sandwich
.
The problem is that RX waits for the first PeanutButter
and the first Jelly
to be emitted before emitting the first combined Sandwich
but Jelly
may never be emitted which means I never get the first Sandwich
.
我想合并这两个提要,以便在发出任一提要的第一个项目时立即发出合并的项目,无论另一个提要是否尚未发出任何内容,我如何在 RxJava 中执行此操作?
I'd like to combine the two feeds such that a combined item is emitted as soon as the first item from either feed is emitted, regardless of whether the other feed has yet to emit anything, how do I do that in RxJava?
推荐答案
一种可能的方法是使用 startWith
运算符在订阅时触发每个流的已知值的发射.这样 combineLatest()
将在任一流发出值时触发.您只需要留意 onNext
消费者中的初始/信号值.
one possible approach would be to use the startWith
operator to trigger an emission of a known value from each stream upon subscription. this way combineLatest()
will trigger if either stream emits a value. you'd just have to be mindful of looking out for the initial/signal values in the onNext
consumer.
这样的东西...:
@Test
public void sandwiches() {
final Observable<String> peanutButters = Observable.just("chunky", "smooth")
.startWith("--initial--");
final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
.startWith("--initial--");
Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
return new Pair<>(peanutButter, jelly);
})
.subscribe(
next -> {
final String peanutButter = next.getFirst();
final String jelly = next.getSecond();
if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
// initial emissions
} else if(peanutButter.equals("--initial--")) {
// jelly emission
} else if(jelly.equals("--initial--")) {
// peanut butter emission
} else {
// peanut butter + jelly emissions
}
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
},
() -> {
System.out.println("## onComplete()");
}
);
}
这篇关于RxJava:结合两个可选的 observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!