系统中的并发订阅服务器执行。反应 [英] Concurrent subscriber execution in System.Reactive

查看:0
本文介绍了系统中的并发订阅服务器执行。反应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个批处理管道,它每Y秒处理X个未完成的操作。我觉得System.Reactive很适合这一点,但我无法让订阅者并行执行。我的代码如下所示:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

有没有一种优雅的方式以并发方式从此缓冲区Subject读取?

推荐答案

处方订阅代码始终?同步。您需要做的是从Subscribe委托中删除处理代码,并使其成为可观察序列的副作用。以下是如何做到这一点的:

Subject<int> subject = new();
int concurrentCount = 0;

var processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(list => Observable.Defer(() => Observable.Start(() =>
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty() // Prevents exception in corner case (empty source)
    .ToTask(); // or RunAsync (either one starts the processor)

Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
    subject.OnNext(i);
});
subject.OnCompleted();

processor.Wait();
Select+Observable.Defer+Observable.Start组合将源序列转换为IObservable<IObservable<Unit>>。它是一个嵌套序列,每个内部序列代表一个list的处理。当Observable.Start的委托完成时,内部序列发出Unit值,然后完成。包装Defer操作符确保内部序列是&COLD";,因此它们在订阅之前不会被启动。然后跟在Merge操作符后面,该操作符将外部序列展开为平面IObservable<Unit>序列。maxConcurrent参数用于配置将同时订阅多少个内部序列。每次Merge运算符订阅内部序列时,对应的Observable.Start委托开始在ThreadPool线程上运行。

如果您将maxConcurrent设置得太高,ThreadPool可能会用完工作进程(换句话说,它可能会饱和),并且 然后,代码的并发性将依赖于ThreadPool的可用性。如果您愿意,可以使用ThreadPool.SetMinThreads方法增加ThreadPool按需立即创建的工作进程数。但是,如果您的工作负载是受CPU限制的,并且您将工作线程增加到Environment.ProcessorCount值以上,那么很可能您的CPU将会饱和。

如果您的工作负载是异步的,您可以将Observable.Defer+Observable.Start组合替换为Observable.FromAsync运算符,如here所示。

存在一个unpublishedAsyncRx.NET,它采用了异步订阅的思想。它基于新接口IAsyncObservable<T>IAsyncObserver<T>

这篇关于系统中的并发订阅服务器执行。反应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆