Rx:一个类似zip的运算符,在其中一个流结束后会继续吗? [英] Rx: a zip-like operator that continues after one of the streams ended?

查看:103
本文介绍了Rx:一个类似zip的运算符,在其中一个流结束后会继续吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望合并异步开始和结束的流(可观察到的):

I'm looking to combine streams (observables) that start and end asynchronously:

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

我需要什么:将音频流添加在一起.它们是音频块"的流,但是我将在这里用整数表示它们.因此,有第一个剪辑正在播放:

What I need it for: Adding audio streams together. They're streams of audio "chunks", but I'm going to represent them with integers here. So there's the first clip playing:

-1----1----1----1---|->

然后第二个开始,稍后:

and then a second one starts, a bit later:

     -2----2--|->

按总和组合的结果应为:

The result of combining them by sum should be:

-1----3----3----1---|->

但是,如果任何压缩流结束,则标准zip会完成.我希望这个optional_zip能够继续进行,即使其中一个流结束了.是否有任何方法可以在Rx中执行此操作,还是必须自己通过修改现有的Zip来实现?

But the standard zip completes if any of the zipped streams end. I want this optional_zip to keep going even if one of the streams ends. Is there any way of doing this in Rx, or do I have to implement it myself by modifying the existing Zip?

注意:我正在使用RxPy,但是这里的社区似乎很小,并且Rx运算符似乎在所有语言中都很通用,因此我也将其标记为rx-java和rx-js.

note: I'm using RxPy, but the community here seems small and Rx operators seem to be pretty universal across languages, so I tagged it as rx-java and rx-js too.

推荐答案

我将这个问题分为两部分来解决.首先,我想要一个Observable<Observable<T>>并产生一个Observable<Observable<T>[]>的东西,其中数组仅包含活动"(即非完整)的可观察对象.每当将新元素添加到外部可观察物,并且每次内部可观察物之一完成时,都会发出包含适当可观察物的新数组.这本质上是对主流的扫描"减少.

I'd tackle this problem by breaking it into two parts. First, I'd want something that takes an Observable<Observable<T>> and produces an Observable<Observable<T>[]> where the array contains only the "active" (i.e. non-complete) observables. Any time a new element is added to the outer observable, and any time one of the inner observables completes, a new array would be emitted containing the appropriate observables. This is essentially a "scan" reduction of the primary stream.

一旦您可以做到这一点,就可以使用flatMapLatest和zip来获取所需的内容.

Once you've got something that can do that, you can use flatMapLatest and zip to get what you want.

我在第一部分的基本尝试如下:

My basic attempt at the first part is as follows:

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];

    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });

        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });

    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

从那里,您只需将其压缩并展平,如下所示:

From there, you'd just zip it and flatten it out like so:

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

我假设零个活动流不产生任何结果,一个活动流应产生其自己的元素,而两个或更多流应该全部拉在一起(所有这些都反映在地图应用程序中).

I've made the assumptions that zero active streams should yield nothing, one active stream should yield its own elements, and two or more streams should all zip together (all of which is reflected in the map application).

我的测试(相当有限)具有这种组合,可以产生您所追求的结果.

My (admittedly fairly limited) testing has this combination yielding the results you were after.

顺便问一个好问题.我还没有看到任何可以解决问题的第一部分的信息(尽管我绝不是Rx专家;如果有人知道已经做到了这一点,请发布详细信息).

Great question, by the way. I've not seen anything that solves the first part of the problem (though I'm by no means an Rx expert; if someone knows of something that already does this, please post details).

这篇关于Rx:一个类似zip的运算符,在其中一个流结束后会继续吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆