如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的一些更好的方法编写以下代码? [英] How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?
问题描述
我有两个可观察的管道.我需要一个接一个地运行并比较两个值是否相等.我尝试了下面的代码.这应该可以工作,当第一个可观察值发出时,它应该去取第二个 obserbla 值,并且应该首先返回值.我需要一些专家帮助,以更好的方式重新编写此代码.
I have two observable pipes. I need to run one after the other and compare two values equal or not. I tried the below code.This should work, when the first observable value emitted , it should go and take second obserbla value and should comapre it first return value.I need to some expert help , to refator this code better way.
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
(res: UnitDetail) =>{
if(res.unitTwo){
this.appStore.select(selectUnit).
pipe(shareReplayUntil(this.destroySub)).subscribe(
(unitId: string) => {
if(unitId === res.unitTwo){
this.sameUnit = true;
}else{
this.sameUnit = false;
}
});
}
}
);
推荐答案
你不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$
和 this.appStore.select(selectUnit)
是相互独立的.相反,您可以使用诸如 forkJoin
、combineLatest
或 zip
之类的函数来并行接收来自它们的通知.
You don't need higher order operators since the observables this.selectedUnitDetailModel$
and this.appStore.select(selectUnit)
are independent of each other. Instead you could use functions like forkJoin
, combineLatest
or zip
to get the notifications from them in parallel.
您可以在此处找到这些函数的不同之处.
You could find difference b/n these functions here.
试试下面的方法
forkJoin(
this.selectedUnitDetailModel$.pipe(take(1)), // <-- complete on first emission
this.appStore.select(selectUnit).pipe(take(1)) // <-- complete on first emission
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);
forkJoin
仅在源 observable 完成时发出,所以我已经将 take(1)
用管道传送到每个 observable.forkJoin
现在将在每个 observable 和 complete 的第一次发射时发射.因此,减轻了对 shareReplayUntil(this.destroySub)
的需求.
forkJoin
only emits when the source observables complete, so I've piped in take(1)
to each observable. The forkJoin
will now emit on the first emission of each observable and complete. So the need for your shareReplayUntil(this.destroySub)
is mitigated.
然而,如果您需要保持来自 observables 的发射流打开,您可以使用 combineLatest
或 zip
代替.在这种情况下,您可以将 take(1)
替换为您的shareReplayUntil(this.destroySub)".
However, if you need to keep the emission stream from the observables open, you could use combineLatest
or zip
instead. In this case, you could replace the take(1)
with your ``shareReplayUntil(this.destroySub)`.
就像我之前说的,您可以使用 combineLatest
而不是 forkJoin
来启用连续的数据流.
Like I said before, you could use combineLatest
instead of forkJoin
to enable a continuous stream of data.
试试下面的方法
import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
combineLatest(
this.selectedUnitDetailModel$,
this.appStore.select(selectUnit)
).pipe(
takeUntil(this.destroySub) // <-- replaced with `takeUntil` operator
).subscribe(
([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
(error) => console.log(error) // <-- handle error
);
这篇关于如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的一些更好的方法编写以下代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!