两个 observable 之间的差异 [英] Difference between two observables

查看:32
本文介绍了两个 observable 之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有两个 observable.

第一个 observable 是一个特定列表的数组:

<预><代码>[{id: 'zzz', 其他道具在这里...},{id: 'aaa', ...},{id: '007', ...}... 随着时间的推移]

第二个 observable 是一个被忽略列表的数组:

<预><代码>[{id: '007'},//只有id,没有其他道具{id:'zzz'}... 随着时间的推移]

结果应该是一个新的可观察的列表(第一个可观察的),但不能有任何被忽略的列表:

<预><代码>[{id:'aaa',这里还有其他道具...}... 随着时间的推移]

这是我发帖前的样子:

obs2.pipe(withLatestFrom(obs1, ? => ?, filter(?));

解决方案

如果我没理解错,你想做的是

  1. 随着时间的推移汇总传入的项目
  2. 聚合随着时间的推移要忽略的 id
  3. 最后,由于上述两个流都随着时间的推移而发出,因此发出不包含被忽略 ID 的结果列表.

鉴于上述情况,以下是您可以尝试的粗略示例.正如底部所述,根据前两个流的节奏,您将获得不同的结果,因为,这就是异步发生的情况.为了证明这一点,我正在模拟随时间推移事物发射的随机延迟.

希望这会有所帮助!

P.S.:下面是 Typescript,假设 rxjs@^6.

import { BehaviorSubject, combineLatest, of, Observable } from "rxjs";从rxjs/operators"导入{延迟,地图,扫描,concatMap};/*** 数据源*///仅用于展示目的... 模拟随时间发射的项目constsimulatedEmitOverTime = <T>() =>(来源:Observable)=>源.管道(concatMap(thing => of(thing).pipe(delay(Math.random() * 1000))));接口东西{id:字符串;}//随时间变化的事物流const thingsOverTime$ = of({ id: "zzz" },{ id: "aaa" },{ ID:007"}).管道(模拟EmitOverTime());//随着时间的推移被忽略的东西流const 忽略ThingsOverTime$ = of({ id: "007" },{ id: "zzz" }).管道(模拟EmitOverTime());/*** 在你的应用程序中的某个地方*///聚合传入的东西//`scan` 接受一个 reducer 类型的函数constaggregedThings$ = thingsOverTime$.pipe(扫描((aggregatedThings: Thing[],comingThing: Thing) =>聚合事物.concat(传入事物),[]));//从传入的被忽略的事物 id 创建一个集合//一个集合将允许随着时间的推移轻松过滤const ignoreIds$ = ignoreThingsOverTime$.pipe(扫描((excludedIdSet,incomingThing: Thing) =>excludeIdSet.add(incomingThing.id),new Set()));//合并流,然后过滤掉忽略的 idconst sanitizedThings$ = combineLatest(aggregatedThings$, ignoreIds$).管道(map(([东西,被忽略]) => things.filter(({ id }) => !ignored.has(id))));//在需要的地方订阅//注意:最终结果将根据物品进入的时间而有所不同//随着时间的推移(这里正在模拟)sanitizedThings$.subscribe(console.log);

Let's say I have two observables.

The first observable is an array of certain listings:

[
    {id: 'zzz', other props here...},
    {id: 'aaa', ...},
    {id: '007', ...}
    ... and more over time
]

The second observable is an array of ignored listings:

[
    {id: '007'}, // only id, no other props
    {id: 'zzz'}
    ... and more over time
]

The result should be a new observable of listings (first observable) but must not have any of the ignored listings:

[
    {id: 'aaa', other props here...}
    ... and more over time
] 

This is what I have now before posting:

obs2.pipe(withLatestFrom(obs1, ? => ?, filter(?));

解决方案

If I'm understanding correctly, what you'll want to do is

  1. Aggregate the incoming items over time
  2. Aggregate the ids that are to be ignored over time
  3. Finally, as both of the above streams emit over time, emit a resulting list of items that don't include the ignored ids.

Given the above, below is a rough example you could try. As noted towards the bottom, you'll get different results depending on the cadence of the first two streams because, well, thats's what happens with async. To show that, I'm simulating a random delay in the emission of things over time.

Hope this helps!

P.S.: The below is Typescript, assuming rxjs@^6.

import { BehaviorSubject, combineLatest, of, Observable } from "rxjs";
import { delay, map, scan, concatMap } from "rxjs/operators";

/**
 * Data sources
 */

// Just for showcase purposes... Simulates items emitted over time
const simulatedEmitOverTime = <T>() => (source: Observable<T>) =>
  source.pipe(
    concatMap(thing => of(thing).pipe(delay(Math.random() * 1000)))
  );

interface Thing {
  id: string;
}

// Stream of things over time
const thingsOverTime$ = of(
  { id: "zzz" },
  { id: "aaa" },
  { id: "007" }
).pipe(
  simulatedEmitOverTime()
);

// Stream of ignored things over time
const ignoredThingsOverTime$ = of(
  { id: "007" },
  { id: "zzz" }
).pipe(
  simulatedEmitOverTime()
);


/**
 * Somewhere in your app
 */

// Aggregate incoming things
// `scan` takes a reducer-type function
const aggregatedThings$ = thingsOverTime$.pipe(
  scan(
    (aggregatedThings: Thing[], incomingThing: Thing) =>
      aggregatedThings.concat(incomingThing),
    []
  )
);

// Create a Set from incoming ignored thing ids
// A Set will allow for easy filtering over time
const ignoredIds$ = ignoredThingsOverTime$.pipe(
  scan(
    (excludedIdSet, incomingThing: Thing) =>
      excludedIdSet.add(incomingThing.id),
    new Set<string>()
  )
);

// Combine stream and then filter out ignored ids
const sanitizedThings$ = combineLatest(aggregatedThings$, ignoredIds$)
  .pipe(
    map(([things, ignored]) => things.filter(({ id }) => !ignored.has(id)))
  );

// Subscribe where needed
// Note: End result will vary depending on the timing of items coming in
// over time (which is being simulated here-ish)
sanitizedThings$.subscribe(console.log);

这篇关于两个 observable 之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆