如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的一些更好的方法编写以下代码? [英] How to write below code using MergeMap or FlatMap or some better way with rxJs-operators?

查看:44
本文介绍了如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的一些更好的方法编写以下代码?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个可观察的管道.我需要一个接一个地运行并比较两个值是否相等.我尝试了下面的代码.这应该可以工作,当第一个可观察值发出时,它应该去取第二个 obserbla 值,并且应该首先返回值.我需要一些专家帮助,以更好的方式重新编写此代码.

I have two observable pipes. I need to run one after the other and compare two values equal or not. I tried the below code.This should work, when the first observable value emitted , it should go and take second obserbla value and should comapre it first return value.I need to some expert help , to refator this code better way.

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );

推荐答案

你不需要高阶运算符,因为可观察对象 this.selectedUnitDetailModel$this.appStore.select(selectUnit) 是相互独立的.相反,您可以使用诸如 forkJoincombineLatestzip 之类的函数来并行接收来自它们的通知.

You don't need higher order operators since the observables this.selectedUnitDetailModel$ and this.appStore.select(selectUnit) are independent of each other. Instead you could use functions like forkJoin, combineLatest or zip to get the notifications from them in parallel.

您可以在此处找到这些函数的不同之处.

You could find difference b/n these functions here.

试试下面的方法

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),      // <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

forkJoin 仅在源 observable 完成时发出,所以我已经将 take(1) 用管道传送到每个 observable.forkJoin 现在将在每个 observable 和 complete 的第一次发射时发射.因此,减轻了对 shareReplayUntil(this.destroySub) 的需求.

forkJoin only emits when the source observables complete, so I've piped in take(1) to each observable. The forkJoin will now emit on the first emission of each observable and complete. So the need for your shareReplayUntil(this.destroySub) is mitigated.

然而,如果您需要保持来自 observables 的发射流打开,您可以使用 combineLatestzip 代替.在这种情况下,您可以将 take(1) 替换为您的shareReplayUntil(this.destroySub)".

However, if you need to keep the emission stream from the observables open, you could use combineLatest or zip instead. In this case, you could replace the take(1) with your ``shareReplayUntil(this.destroySub)`.

就像我之前说的,您可以使用 combineLatest 而不是 forkJoin 来启用连续的数据流.

Like I said before, you could use combineLatest instead of forkJoin to enable a continuous stream of data.

试试下面的方法

import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,
  this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

这篇关于如何使用 MergeMap 或 FlatMap 或使用 rxJs-operators 的一些更好的方法编写以下代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆