系统中的并发订阅服务器执行。反应 [英] Concurrent subscriber execution in System.Reactive
本文介绍了系统中的并发订阅服务器执行。反应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在编写一个批处理管道,它每Y秒处理X个未完成的操作。我觉得System.Reactive
很适合这一点,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
有没有一种优雅的方式以并发方式从此缓冲区Subject
读取?
推荐答案
处方订阅代码始终?同步。您需要做的是从Subscribe
委托中删除处理代码,并使其成为可观察序列的副作用。以下是如何做到这一点的:
Subject<int> subject = new();
int concurrentCount = 0;
var processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
processor.Wait();
Select
+Observable.Defer
+Observable.Start
组合将源序列转换为IObservable<IObservable<Unit>>
。它是一个嵌套序列,每个内部序列代表一个list
的处理。当Observable.Start
的委托完成时,内部序列发出Unit
值,然后完成。包装Defer
操作符确保内部序列是&COLD";,因此它们在订阅之前不会被启动。然后跟在Merge
操作符后面,该操作符将外部序列展开为平面IObservable<Unit>
序列。maxConcurrent
参数用于配置将同时订阅多少个内部序列。每次Merge
运算符订阅内部序列时,对应的Observable.Start
委托开始在ThreadPool
线程上运行。
如果您将maxConcurrent
设置得太高,ThreadPool
可能会用完工作进程(换句话说,它可能会饱和),并且
然后,代码的并发性将依赖于ThreadPool
的可用性。如果您愿意,可以使用ThreadPool.SetMinThreads
方法增加ThreadPool
按需立即创建的工作进程数。但是,如果您的工作负载是受CPU限制的,并且您将工作线程增加到Environment.ProcessorCount
值以上,那么很可能您的CPU将会饱和。
如果您的工作负载是异步的,您可以将Observable.Defer
+Observable.Start
组合替换为Observable.FromAsync
运算符,如here所示。
IAsyncObservable<T>
和IAsyncObserver<T>
。
这篇关于系统中的并发订阅服务器执行。反应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文