有没有办法创建这个流序列? [英] There's a way of create this sequence of Streams?

查看:30
本文介绍了有没有办法创建这个流序列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现这个大理石图,具有 N 个 sN$ 的 hipotesis,并且我正在将此流添加到 main$.

I'm trying to implement this marble diagram, with the hipotesis of have a N number of sN$, and I'm adding this streams to the main$.

s1$    +--1--------------------99--------------------->
s2$    +------3--------7------------------------------>

main$  +---[1]-[1, 3]---[1, 7]---[99, 7]-------------->

现在我有一个近似值,但有重复"

Right now I have a aproximation, but with the "repetitions"

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .scan((a, c) => [...a, c], [])
  .subscribe(v => console.log(v))

s1$.subscribe(x => main$.onNext(x))
s2$.subscribe(x => main$.onNext(x))    

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)

/*
  Expect:
    [3]
    [3, 1]
    [6, 1]
    [6, 44]
*/

/*
  What I have:
     [3]
     [3, 1]
     [3, 1, 6]
     [3, 1, 6, 44]
*/

有没有办法做到这一点?我还尝试将流 sN$ 添加到 main$ 中:

There's a way of doing this? Also I tried to add the streams sN$ into main$:

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .mergeAll()
  .scan((a, c) => [...a, c], [])
  .subscribe(
    (v) => console.log(v)
  )

main$.onNext(s1$)
main$.onNext(s2$)

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)

推荐答案

您可以使用 combineLatest.虽然这仍然需要每个流都以一个值开头,但您可以使用 startWith.

You can use combineLatest. While that still require every stream to start with a value, you can prefix a null value to make every stream start with something using startWith.

const source = Rx.Observable.combineLatest(
  s1.startWith(void 0),
  s2.startWith(void 0),
  s3.startWith(void 0),
  (s1, s2, s3) => [s1, s2, s3])

可选地,您可以从结果数组中删除 undefined 值.

Optional you can remove undefined values from the resulting array.

现在,我们可以扩展它以使用可变的流列表.感谢@xgrommx.

Now, we can extend that to work with a variable list of streams. Credits to @xgrommx.

main$
 .scan((a, c) => a.concat(c), [])
 .switch(obs => Rx.Observable.combineLatest(obs))

我们还可以使用 c.shareReplay(1) 使流在我们切换时记住最后一个值.但是,它不会与 c.startWith(void 0) 结合使用,因此我们可以使用其中之一.

We can also use c.shareReplay(1) to make streams remember there last value when we switch. That however, wont combine with c.startWith(void 0), so we can use either one or the other.

示例:

    const main$ = new Rx.Subject()
    const s1$ = new Rx.Subject(1)
    const s2$ = new Rx.Subject(1)
    const s3$ = new Rx.Subject(1)
    const s4$ = new Rx.Subject(1)

    main$
     .scan((a, c) => a.concat(c.shareReplay(1)), [])
     .map(obs => Rx.Observable.combineLatest(obs))
     .switch()
     .map(v => v.filter(e => !!e))
     .map(v => v.join(','))
     .subscribe(v => $('#result').append('<br>' + v))

    main$.onNext(s1$)
    s1$.onNext(1)
    main$.onNext(s2$)
    s2$.onNext(void 0) // Since we can't use startWith
    main$.onNext(s3$)
    s3$.onNext(5)
    s1$.onNext(55)
    s2$.onNext(12)
    s2$.onNext(14)
    s3$.onNext(6)
    main$.onNext(s4$)
    s4$.onNext(999)

    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    <div id="result"></div>

这篇关于有没有办法创建这个流序列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆