如何将多个可观察变量与订单保存和最大并发合并? [英] How to merge multiple observables with order preservation and maximum concurrency?

查看:49
本文介绍了如何将多个可观察变量与订单保存和最大并发合并?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我搜索了一个重复项,但没有找到任何重复项.我所拥有的是嵌套的可观察的 IObservable< IObservable< T>> ,我想将其展平为 IObservable< IObservable< T> .我不想使用

I searched for a duplicate and didn't find any. What I have is a nested observable IObservable<IObservable<T>>, and I want to flatten it to a IObservable<T>. I don't want to use the Concat operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T values immediately after they are emitted by the outer observable. I also don't want to use the Merge operator because it messes the order of the emitted values. The marble diagram below shows the problematic (for my case) behavior of the Merge operator, as well as the Desirable merging behavior.

Stream of observables: +--1---2---3--|
Observable-1         :    +-A----------B-----|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F----|
Merge                : +----A----C--E--B--D--F----|
Desirable merging    : +----A----------BC-DE-F----|

Observable-1发出的所有值都应在Observable-2发出的任何值之前.对于Observable-2和Observable-3,同样如此.

All values emitted by the Observable-1 should precede any value emitted by the Observable-2. The same should be true with the Observable-2 and Observable-3, and so on.

我喜欢使用 Merge 运算符的地方在于,它允许配置内部可观察对象的最大并发订阅数.我想使用我尝试实现的自定义 MergeOrdered 运算符保留此功能.这是我的施工方法:

What I like with the Merge operator is that it allows to configure the maximum concurrent subscriptions to inner observables. I would like to preserve this functionality with the custom MergeOrdered operator I am trying to implement. Here is my under-construction method:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

这是一个用法示例:

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

输出(不希望的):

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

理想的输出是:

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C


说明:关于值的顺序,值本身无关紧要.重要的是它们起源的内部序列的顺序以及它们在该序列中的位置.应该首先发出第一个内部序列中的所有值(以其原始顺序),然​​后是第二个内部序列中的所有值,然后是第三个内部序列中的所有值,等等.


Clarification: Regarding the ordering of the values, the values themselves are irrelevant. What matters is the order of their originated inner sequence, and their position in that sequence. All values from the first inner sequence should be emitted first (in their original order), then all the values from the second inner sequence, then all the values from the third, etc.

推荐答案

我通过以可控制的方式预热(发布)内部序列,找到了解决此问题的方法.此解决方案使用 <用于控制温度的code> Replay 运算符,以及 SemaphoreSlim 用于控制并发.最终的 Concat 运算符确保每个内部序列的值都将按期望的顺序(顺序)发出.

I figured out a solution to this problem, by warming (publishing) the inner sequences in a controllable manner. This solution uses the Replay operator for controlling the temperature, and a SemaphoreSlim for controlling the concurrency. The final Concat operator ensures that the values of each inner sequence will be emitted in the desirable order (sequentially).

/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, preserving the order of the elements based on the order of their
/// originated sequence, limiting the number of concurrent subscriptions to inner
/// sequences.
/// </summary>
public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(maximumConcurrency);
        return source.Select(inner =>
        {
            var published = inner.Replay();
            _ = semaphore.WaitAsync().ContinueWith(_ => published.Connect(),
                TaskScheduler.Default);
            return published.Finally(() => semaphore.Release());
        })
        .Concat();
    });
}

使用 Defer 运算符是为了使每个订阅具有不同的 SemaphoreSlim ().将相同的 SemaphoreSlim 与多个订阅一起使用可能会出现问题.

The Defer operator is used in order to have a different SemaphoreSlim for each subscription (reference). Using the same SemaphoreSlim with multiple subscriptions could be problematic.

这不是一个完美的解决方案,因为没有理由发布 Concat 当前订阅的内部序列.优化这种效率低下并不是一件容易的事,所以我将其保持不变.

This is not a perfect solution because there is no reason for the inner sequence currently subscribed by the Concat to be published. Optimizing this inefficiency in not trivial though, so I'll leave it as is.

这篇关于如何将多个可观察变量与订单保存和最大并发合并?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆