Rx:在其中一个流结束后继续的类似 zip 的运算符? [英] Rx: a zip-like operator that continues after one of the streams ended?

查看:33
本文介绍了Rx:在其中一个流结束后继续的类似 zip 的运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望合并异步开始和结束的流(可观察对象):

I'm looking to combine streams (observables) that start and end asynchronously:

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

我需要它做什么:将音频流添加在一起.它们是音频块"流,但我将在这里用整数表示它们.所以第一个剪辑正在播放:

What I need it for: Adding audio streams together. They're streams of audio "chunks", but I'm going to represent them with integers here. So there's the first clip playing:

-1----1----1----1---|->

然后第二个开始,稍后:

and then a second one starts, a bit later:

     -2----2--|->

将它们相加的结果应该是:

The result of combining them by sum should be:

-1----3----3----1---|->

但是如果任何压缩流结束,标准 zip 就会完成.即使其中一个流结束,我也希望这个 optional_zip 继续运行.在 Rx 中有没有办法做到这一点,还是我必须通过修改现有的 Zip 来实现它?

But the standard zip completes if any of the zipped streams end. I want this optional_zip to keep going even if one of the streams ends. Is there any way of doing this in Rx, or do I have to implement it myself by modifying the existing Zip?

注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 操作符似乎跨语言非常通用,所以我也将其标记为 rx-java 和 rx-js.

note: I'm using RxPy, but the community here seems small and Rx operators seem to be pretty universal across languages, so I tagged it as rx-java and rx-js too.

推荐答案

我会把这个问题分成两部分来解决.首先,我想要一些接受 Observable> 并产生一个 Observable[]> 的东西,其中数组只包含活动的"(即不完整的)可观察对象.任何时候一个新元素被添加到外部 observable 中,并且任何时候一个内部 observable 完成,一个包含适当的 observable 的新数组将被发出.这本质上是对主流的扫描"缩减.

I'd tackle this problem by breaking it into two parts. First, I'd want something that takes an Observable<Observable<T>> and produces an Observable<Observable<T>[]> where the array contains only the "active" (i.e. non-complete) observables. Any time a new element is added to the outer observable, and any time one of the inner observables completes, a new array would be emitted containing the appropriate observables. This is essentially a "scan" reduction of the primary stream.

一旦你有了可以做到这一点的东西,你就可以使用 flatMapLatest 和 zip 来获得你想要的东西.

Once you've got something that can do that, you can use flatMapLatest and zip to get what you want.

我在第一部分的基本尝试如下:

My basic attempt at the first part is as follows:

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];

    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });

        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });

    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

从那里开始,您只需像这样将其压缩并压平即可:

From there, you'd just zip it and flatten it out like so:

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

我已经假设零个活动流不应该产生任何东西,一个活动流应该产生它自己的元素,并且两个或多个流应该全部压缩在一起(所有这些都反映在地图应用程序中).

I've made the assumptions that zero active streams should yield nothing, one active stream should yield its own elements, and two or more streams should all zip together (all of which is reflected in the map application).

我的(公认相当有限的)测试表明这种组合产生了您想要的结果.

My (admittedly fairly limited) testing has this combination yielding the results you were after.

顺便说一句,好问题.我还没有看到任何解决问题的第一部分的方法(尽管我绝不是 Rx 专家;如果有人知道已经这样做的东西,请发布详细信息).

Great question, by the way. I've not seen anything that solves the first part of the problem (though I'm by no means an Rx expert; if someone knows of something that already does this, please post details).

这篇关于Rx:在其中一个流结束后继续的类似 zip 的运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆