有一种方法可以创建这个Streams序列吗? [英] There's a way of create this sequence of Streams?

查看:68
本文介绍了有一种方法可以创建这个Streams序列吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现这个大理石图,其中hipotesis有N个sN $,我将这些流添加到主$。

I'm trying to implement this marble diagram, with the hipotesis of have a N number of sN$, and I'm adding this streams to the main$.

s1$    +--1--------------------99--------------------->
s2$    +------3--------7------------------------------>

main$  +---[1]-[1, 3]---[1, 7]---[99, 7]-------------->

现在我有一个aproximation,但是有了重复

Right now I have a aproximation, but with the "repetitions"

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .scan((a, c) => [...a, c], [])
  .subscribe(v => console.log(v))

s1$.subscribe(x => main$.onNext(x))
s2$.subscribe(x => main$.onNext(x))    

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)

/*
  Expect:
    [3]
    [3, 1]
    [6, 1]
    [6, 44]
*/

/*
  What I have:
     [3]
     [3, 1]
     [3, 1, 6]
     [3, 1, 6, 44]
*/

有办法做到这一点吗?
我还试图将流sN $添加到主$:

There's a way of doing this? Also I tried to add the streams sN$ into main$:

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .mergeAll()
  .scan((a, c) => [...a, c], [])
  .subscribe(
    (v) => console.log(v)
  )

main$.onNext(s1$)
main$.onNext(s2$)

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)


推荐答案

您可以使用 combineLatest 。虽然仍然需要每个流都以一个值开头,但您可以在 null 前添加前缀,以使每个流以 startWith

You can use combineLatest. While that still require every stream to start with a value, you can prefix a null value to make every stream start with something using startWith.

const source = Rx.Observable.combineLatest(
  s1.startWith(void 0),
  s2.startWith(void 0),
  s3.startWith(void 0),
  (s1, s2, s3) => [s1, s2, s3])

可选您可以删除 undefined 结果数组中的值。

Optional you can remove undefined values from the resulting array.

现在,我们可以将其扩展为使用变量流列表。积分给@xgrommx。

Now, we can extend that to work with a variable list of streams. Credits to @xgrommx.

main$
 .scan((a, c) => a.concat(c), [])
 .switch(obs => Rx.Observable.combineLatest(obs))

当我们切换 c.shareReplay(1)让流记住最后一个值>。但是,不会与 c.startWith(void 0)结合使用,所以我们可以使用其中一个。

We can also use c.shareReplay(1) to make streams remember there last value when we switch. That however, wont combine with c.startWith(void 0), so we can use either one or the other.

示例:

    const main$ = new Rx.Subject()
    const s1$ = new Rx.Subject(1)
    const s2$ = new Rx.Subject(1)
    const s3$ = new Rx.Subject(1)
    const s4$ = new Rx.Subject(1)

    main$
     .scan((a, c) => a.concat(c.shareReplay(1)), [])
     .map(obs => Rx.Observable.combineLatest(obs))
     .switch()
     .map(v => v.filter(e => !!e))
     .map(v => v.join(','))
     .subscribe(v => $('#result').append('<br>' + v))

    main$.onNext(s1$)
    s1$.onNext(1)
    main$.onNext(s2$)
    s2$.onNext(void 0) // Since we can't use startWith
    main$.onNext(s3$)
    s3$.onNext(5)
    s1$.onNext(55)
    s2$.onNext(12)
    s2$.onNext(14)
    s3$.onNext(6)
    main$.onNext(s4$)
    s4$.onNext(999)

    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    <div id="result"></div>

这篇关于有一种方法可以创建这个Streams序列吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆