如何合并嵌套的 observable IObservable<IObservable<T>>并发性和缓冲容量有限? [英] How to merge a nested observable IObservable&lt;IObservable&lt;T&gt;&gt; with limited concurrency and limited buffer capacity?

查看:57
本文介绍了如何合并嵌套的 observable IObservable<IObservable<T>>并发性和缓冲容量有限?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到 Rx Merge 运算符接受一个可选的 maxConcurrent 参数.这可用于通过同时订阅有限数量的子序列来限制最大并发.当新子序列的推送速度低于订阅子序列的完成速度时,它可以完美地工作,但是当新子序列的推送速度比这个速度快时,它就会出现问题.发生的情况是子序列被缓冲在一个内部缓冲区中,其大小永远增加,而且当前订阅的子序列变得越来越老.下面是这个问题的演示:

I noticed that the Rx Merge operator accepts an optional maxConcurrent parameter. This can be used to limit the maximum concurrency, by subscribing concurrently to a limited number of subsequences. It works perfectly when new subsequences are pushed at a slower rate than the rate of the completion of the subscribed subsequences, but it becomes problematic when new subsequences are pushed faster than that. What happens is that the subsequences are buffered in an internal buffer with a forever increasing size, and also that the currently subscribed subsequences are becoming older and older. Here is a demonstration of this problem:

await Observable
    .Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}, " +
            $"Now: {DateTime.Now:HH:mm:ss.fff}, " +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);

每 10 毫秒推送一个新子序列,每个子序列在 1000 毫秒后完成.子序列以最大并发1(顺序)合并.

A new subsequence is pushed every 10 milliseconds, and each subsequence completes after 1000 milliseconds. The subsequences are merged with maximum concurrency 1 (sequentially).

输出:

Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes

(在 Fiddle 上试试)

内存使用量稳步增长,每个子序列的创建和订阅之间的时间差距也越来越大.

The memory usage grows steadily, and the time gap between the creation and subscription of each subsequence grows as well.

我想要的是一个自定义的 Merge 变体,它有一个有限大小的内部缓冲区.当缓冲区已满时,任何传入子序列都应导致当前最旧的缓冲子序列被丢弃.这是理想行为的弹珠图,配置为最大并发数 = 1 和缓冲区容量 = 1:

What I would like to have is a custom Merge variant that has an internal buffer with limited size. When the buffer is full, any incoming subsequence should cause the currently oldest buffered subsequence to be dropped. Here is a marble diagram of the desirable behavior, configured with maximum concurrency = 1 and buffer capacity = 1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|

  • 子序列 A 在发出后立即被订阅.
  • 然后 B 被发射并存储在缓冲区中,因为 A 还没有完成.
  • 然后发出 C 并替换缓冲区中的 B.结果 B 子序列被丢弃并且从未被订阅.
  • 在子序列 A 完成之后,立即订阅了缓冲的子序列 C.
  • 最终结果包含 A 和 C 子序列发出的合并值.
  • 如何实现具有这种特定行为的自定义 Rx 运算符?这是我试图实现的运算符的存根:

    How could I implement a custom Rx operator with this specific behavior? Here is the stub of the operator I am trying to implement:

    public static IObservable<T> MergeBounded<T>(
        this IObservable<IObservable<T>> source,
        int maximumConcurrency,
        int boundedCapacity)
    {
        return source.Merge(maximumConcurrency);
        // TODO: enforce the boundedCapacity policy somehow
    }
    

    推荐答案

    我想出了一个功能性解决方案,我不确定它是否可行,只是因为复杂.但我想我涵盖了所有的基础.

    I came up with a functional solution, I'm not sure it's the way to go, just because of complexity. But I think I covered all the bases.

    首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前正在执行多少个 observable 以及缓冲区队列.可以影响状态的两个事件是一个新的 Observable 进入缓冲区队列(导致缓冲区队列入队),或者当前正在执行的 observable 终止(导致缓冲区队列出队).

    First, if you take a functional approach, this is a relatively simple state-machine problem: The state needs to know how many observables are currently executing and the buffer queue. The two events that can affect the state are a new Observable entering the buffer queue (causes an enqueue on the buffer queue), or a currently-executing observable terminating (causes a dequeue on the buffer queue).

    由于状态机基本上意味着 Scan,而 Scan 只能处理一种类型,所以我们必须将我们的两个事件强制转换为一种类型,我称之为留言如下.然后状态机知道所有并可以完成Merge(n) 重载的工作.

    Since state-machine basically means Scan, and Scan can only work with one type, we'll have to coerce our two events into one type, which I called Message below. The state machine then knows all and can do the work of the Merge(n) overload.

    最后一个技巧是回送:由于完成的 Observable 是 Scan 的下游",我们需要将该 observable 的终止回送"到 Scan.为此,我总是参考 [this answer][1] 中的 Drain 函数.

    The last trick is the loop-back: Since the completing Observable is 'downstream' from Scan, we need to 'loop-back' the termination of that observable into Scan. For that, I always refer back to the Drain function in [this answer][1].

    public static class X
    {
        public static IObservable<T> MergeBounded<T>(
            this IObservable<IObservable<T>> source,
            int maximumConcurrency,
            int boundedCapacity)
        {
            return Observable.Defer(() =>
            {
                var capacityQueue = new Subject<Unit>();
    
                var toReturn = source.Publish(_source => _source
                    .Select(o => Message.Enqueue(o))
                    .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                    .Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
                    {
                        var buffer = state.buffer;
                        var bufferCount = state.bufferCount;
                        var executionCount = state.executionCount;
                        if (message.IsEnqueue)
                        {
                            if (executionCount < maximumConcurrency)
                                return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);
    
                            buffer = buffer.Enqueue(message.Object);
                            if (bufferCount == boundedCapacity)
                                buffer = buffer.Dequeue();
                            else
                                bufferCount++;
                            return (bufferCount, buffer, executionCount, null);
                        }
                        else
                        {
                            if (bufferCount == 0)
                                return (0, buffer, executionCount - 1, null);
                            else
                                return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
                        }
                    })
                    .Where(t => t.item != null)
                    .Select(t => t.item)
                    .Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
                    .TakeUntil(_source.IgnoreElements().Materialize())
                    .Merge()
                );
    
                return toReturn;
            });
    
        }
    
        public class Message
        {
            public static Message<T> Enqueue<T>(T t)
            {
                return Message<T>.Enqueue(t);
            }
    
            public static Message<T> Dequeue<T>(T t)
            {
                return Message<T>.Dequeue(t);
            }
    
        }
    
        public class Message<T>
        {
            private readonly T _t;
            private readonly bool _isEnqueue;
            private Message(bool isEnqueue, T t)
            {
                _t = t;
                _isEnqueue = isEnqueue;
            }
            
            public static Message<T> Enqueue(T t)
            {
                return new Message<T>(true, t);
            }
    
            public static Message<T> Dequeue(T t)
            {
                return new Message<T>(false, t);
            }
            
            public bool IsEnqueue => _isEnqueue;
            public T Object => _t;
        }
    }
    

    我写了一些测试代码(基于原始问题)来验证,如果你想利用它.测试通过:

    I wrote some test-code (based on original question) to verify, if you want to piggy back off of that. Test now passing:

    //              T: 0123456789012345678901234567890123
    //            T10: 0         1         2         3
    //         Source: +----A------B------C------|
    //              A:      +-------a----a---|
    //              B:             +----------b----b---|
    //              C:                    +--------c----|
    // ExpectedResult: +------------a----a---------c----|
    
    
    var ts = new TestScheduler();
    
    var A = ts.CreateHotObservable(
        ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
    );
    var B = ts.CreateHotObservable(
        ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
        ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
    );
    var C = ts.CreateHotObservable(
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
        ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
    );
    var source = ts.CreateHotObservable(
        ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
        ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
        ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
        ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
    );
    var observer = ts.CreateObserver<string>();
    var testResult = source.MergeBounded(1, 1);
    testResult.Subscribe(observer);
    
    var expected = ts.CreateHotObservable(
        ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
        ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
        ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
    );
    ts.Start();
    //observer.Messages.Dump("Actual");   // Linqpad
    //expected.Messages.Dump("Expected"); // Linqpad
    ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);
    

    (测试代码无一例外通过)

    (Test code passes without exception)

    这篇关于如何合并嵌套的 observable IObservable<IObservable<T>>并发性和缓冲容量有限?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆