RxJs:从由 ID 标识的其他流更新列表中的值 [英] RxJs: updating values in a list from an other stream identified by an ID

查看:45
本文介绍了RxJs:从由 ID 标识的其他流更新列表中的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 RX 无限流(我们称它们为 mainValuesdecoratorValues).名为 mainValues 的一个将元素添加到名为 valueList 的列表中.另一个流 (decoratorValues) 应该为列表中的这些元素分配属性.

I have two RX infinite streams (let's call them mainValues and decoratorValues). The one called mainValues are adding elements to a list called valueList. The other stream (decoratorValues) should assign properties to these elements in the list.

两个流中的元素以随机顺序随机到达,我需要以mainValues放入列表中的任一顺序来解决它尽快,而 decoratorValues 不会丢失,直到它们的相应元素(由 ID 标识)到达.

Elements in the two streams are arriving at random times in random order, and I need to solve it in either order that mainValues are put into the list as soon as available, while decoratorValues are not lost until their corresponding elements (identified by an ID) arrive.

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose 
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.fromArray(mainValues).delay(2000);
const decoratorStream = Rx.Observable.fromArray(decoratorValues);

const valueList = [];

mainValueStream.subscribe(
  val => valueList.push(val), 
  (err) => console.log(err),
  () => console.log('complete', valueList));

// CODE time independent merging of the two streams go here

// resulting valueList should be:  [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b', decoratedValue: false }, { id: 3, someValue: 'c', decoratedValue: true } ]

解决思路

mainValueStream
  .concatMap(main => {
     decoratorStream
       .filter(dec => dec.id === main.id)
       .do(dec => {
          main.decoratedValue = dec.decoratedValue;
     }).subscribe(x => x);

     return [main];
  })
  .subscribe(
    val => valueList.push(val), 
    (err) => console.log(err),
    () => console.log('complete', valueList));

不幸的是,这仅在所有装饰器值主要价值流之前到达时才有效.

Unfortunately this only works when all of the decorator values has arrived before the main value stream.

我的第二个想法是添加第二个流,检查给定的值是否已经在 valueList 中并更新它,如果它具有具有适当 ID 的元素.

My second idea would be adding a second stream that checks if the given value is already in valueList and updates it, if it has the element with the appropriate ID.

是否存在只产生一个流的与时间无关的解决方案?或者我被卡住了,因为我想用一个流来解决它?

推荐答案

看起来最好的选择是 forkJoin() 这里.我正在考虑使用 zip() 但由于您在每个列表中没有相同的顺序和项目数量,您可能必须等到两者都完成,然后再处理它们的结果.

It seems like the best option is forkJoin() here. I was thinking about using zip() but since you don't have the same order and number of items in each list you'll probably have to wait until both complete and then work with their results.

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.from(mainValues).delay(500).toArray();
const decoratorStream = Rx.Observable.from(decoratorValues).toArray();

Observable.forkJoin(mainValueStream, decoratorStream, (main, decorator) => {
    main.forEach(m => {
      decorator.forEach(d => {
        if (m.id === d.id) {
          Object.assign(m, d);
        }
      })
    });

    return main;
  })
  .subscribe(val => console.log(val));

我在 forkJoin() 的结果选择器函数中使用了两个嵌套的 forEach() ,因为它看起来很简单(注意 Object.assign 从 ES6 开始存在).我可以将它分成两个 Observable 流,然后过滤并合并它们,但这会变得非常混乱.

I'm using two nested forEach()s within forkJoin()'s result selector function because it seems like the easies way (note that Object.assign exists since ES6). I could split it into two Observable streams then filter and merge them but that would get very messy.

这会打印到控制台:

[ { id: 1, someValue: 'a' },
  { id: 2, someValue: 'b', decoratedValue: false },
  { id: 3, someValue: 'c', decoratedValue: true } ]

如果你需要所有东西都是异步的,那么你可以使用 scan() 在装饰器到达时收集它们:

If you need everything to be asynchronous than you can use scan() to collect decorators as they arrive:

const mainValueStream = Rx.Observable.from(mainValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000));

const decoratorStream = Rx.Observable.from(decoratorValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000))
  .scan((acc, val) => {
    acc.push(val);
    return acc;
  }, [])
  .share();


mainValueStream
  .mergeMap(main => decoratorStream
    .mergeAll()
    .filter(d => d.id === main.id)
    .defaultIfEmpty(null)
    .map(d => {
      if (d) {
        return Object.assign(main, d);
      } else {
        return main;
      }
    })
    .take(1)
  )
  .subscribe(val => console.log(val));

这会在结果到达时以随机顺序打印结果:

This prints results in random order as they arrive:

{ id: 2, someValue: 'b', decoratedValue: false }
{ id: 1, someValue: 'a' }
{ id: 3, someValue: 'c', decoratedValue: true }

核心是从 decoratorStream 到达的集合项.然后在 mainValueStream 中的每个项目上,我创建另一个 Observable,它在收到匹配的装饰项目之前不会发出.

The core is collection items from decoratorStream as they arrive. Then on each item from mainValueStream I make another Observable that doesn't emit until it receives its matching decorate item.

这样做的缺点是 scan() 中的累加器仍在增长,我没有简单的方法可以释放已经使用的项目.

Downside of this is that the accumulator in scan() is still growing and there's no easy way I can release already used items.

这篇关于RxJs:从由 ID 标识的其他流更新列表中的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆