如何合并并发和缓冲区容量有限的嵌套可观测IObservable<IObservable<T>>? [英] How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity?

查看:0
本文介绍了如何合并并发和缓冲区容量有限的嵌套可观测IObservable<IObservable<T>>?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到RxMerge运算符接受一个可选的maxConcurrent参数。这可用于通过并发订阅有限数量的子序列来限制最大并发性。当推送新的子序列的速度慢于订阅的子序列的完成速度时,它工作得很好,但当推送新的子序列的速度快于此速度时,它就会变得有问题。发生的情况是,子序列被缓冲在大小不断增加的内部缓冲区中,并且当前订阅的子序列也变得越来越陈旧。以下是此问题的演示:

await Observable
    .Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}, " +
            $"Now: {DateTime.Now:HH:mm:ss.fff}, " +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);
每隔10毫秒推送一个新的子序列,每个子序列在1000毫秒后完成。子序列以最大并发数1合并(按顺序)。

输出:

Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes

(Try it on Fiddle)

内存使用量稳步增长,每个子序列的创建和订阅之间的时间间隔也越来越大。

我想要的是一个自定义的Merge变体,它有一个大小有限的内部缓冲区。当缓冲区已满时,任何传入的子序列都应该导致丢弃当前最旧的缓冲子序列。下面是所需行为的大理石图,配置了最大并发=1和缓冲区容量=1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|
  • 子序列A在发出后立即被订阅。
  • 然后发出B并将其存储在缓冲区中,因为A尚未完成。
  • 然后发出C并替换缓冲区中的B。因此,B子序列被删除,并且从未被订阅。
  • 完成子序列A之后,立即订阅缓冲子序列C。
  • 最终结果包含A和C子序列发出的合并值。
如何实现具有此特定行为的自定义Rx运算符?下面是我尝试实现的操作符的存根:

public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency,
    int boundedCapacity)
{
    return source.Merge(maximumConcurrency);
    // TODO: enforce the boundedCapacity policy somehow
}

推荐答案

我想出了一个功能解决方案,但由于复杂性,我不确定它是否可行。但我想我已经做好了所有的准备。

首先,如果采用函数式方法,这是一个相对简单的状态机问题:状态需要知道当前有多少可观测对象正在执行和缓冲区队列。可能影响状态的两个事件是新的可观察对象进入缓冲区队列(导致缓冲区队列上的入队),或当前正在执行的可观察对象终止(导致缓冲区队列上的出队)。

由于状态机基本上意味着Scan,并且Scan只能处理一种类型,因此我们必须将两个事件强制为一种类型,下面我称之为Message。然后状态机知道所有情况,并可以执行Merge(n)重载的工作。

最后一个技巧是环回:因为完成的可观测对象是Scan的‘下游’,所以我们需要将该可观测对象的终止‘环回’到Scan中。为此,我总是参考[This Answer][1]中的Drain函数。

public static class X
{
    public static IObservable<T> MergeBounded<T>(
        this IObservable<IObservable<T>> source,
        int maximumConcurrency,
        int boundedCapacity)
    {
        return Observable.Defer(() =>
        {
            var capacityQueue = new Subject<Unit>();

            var toReturn = source.Publish(_source => _source
                .Select(o => Message.Enqueue(o))
                .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                .Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
                {
                    var buffer = state.buffer;
                    var bufferCount = state.bufferCount;
                    var executionCount = state.executionCount;
                    if (message.IsEnqueue)
                    {
                        if (executionCount < maximumConcurrency)
                            return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);

                        buffer = buffer.Enqueue(message.Object);
                        if (bufferCount == boundedCapacity)
                            buffer = buffer.Dequeue();
                        else
                            bufferCount++;
                        return (bufferCount, buffer, executionCount, null);
                    }
                    else
                    {
                        if (bufferCount == 0)
                            return (0, buffer, executionCount - 1, null);
                        else
                            return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
                    }
                })
                .Where(t => t.item != null)
                .Select(t => t.item)
                .Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
                .TakeUntil(_source.IgnoreElements().Materialize())
                .Merge()
            );

            return toReturn;
        });

    }

    public class Message
    {
        public static Message<T> Enqueue<T>(T t)
        {
            return Message<T>.Enqueue(t);
        }

        public static Message<T> Dequeue<T>(T t)
        {
            return Message<T>.Dequeue(t);
        }

    }

    public class Message<T>
    {
        private readonly T _t;
        private readonly bool _isEnqueue;
        private Message(bool isEnqueue, T t)
        {
            _t = t;
            _isEnqueue = isEnqueue;
        }
        
        public static Message<T> Enqueue(T t)
        {
            return new Message<T>(true, t);
        }

        public static Message<T> Dequeue(T t)
        {
            return new Message<T>(false, t);
        }
        
        public bool IsEnqueue => _isEnqueue;
        public T Object => _t;
    }
}

我编写了一些测试代码(基于原始问题)来验证,如果您想利用它的话。测试正在通过:

//              T: 0123456789012345678901234567890123
//            T10: 0         1         2         3
//         Source: +----A------B------C------|
//              A:      +-------a----a---|
//              B:             +----------b----b---|
//              C:                    +--------c----|
// ExpectedResult: +------------a----a---------c----|


var ts = new TestScheduler();

var A = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
    ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
    ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
    ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
    ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual");   // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

(测试代码无异常通过)

这篇关于如何合并并发和缓冲区容量有限的嵌套可观测IObservable&amp;lt;IObservable&amp;lt;T&amp;gt;&amp;gt;?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆