如何在 Rx.NET 中以正确的方式约束并发 [英] How to constraint concurrency the right way in Rx.NET

查看:53
本文介绍了如何在 Rx.NET 中以正确的方式约束并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请注意以下代码片段:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();

此代码的问题在于 getResultAsync 以不受约束的方式并发运行.在某些情况下,这可能不是我们想要的.假设我想将其并发限制为最多 10 个并发调用.Rx.NET 的实现方式是什么?

The problem with this code is that getResultAsync runs concurrently in an unconstrained fashion. Which could be not what we want in certain cases. Suppose I want to restrict its concurrency to at most 10 concurrent invocations. What is the Rx.NET way to do it?

我附上了一个简单的控制台应用程序,它演示了所描述的问题的主题和我的蹩脚解决方案.

I am enclosing a simple console application that demonstrates the subject and my lame solution of the described problem.

有一些额外的代码,比如 Stats 类和人工随机睡眠.它们的存在是为了确保我真正获得并发执行,并能够可靠地计算在此过程中达到的最大并发数.

There is a bit extra code, like the Stats class and the artificial random sleeps. They are there to ensure I truly get concurrent execution and can reliably compute the max concurrency reached during the process.

RunUnconstrained 方法演示了简单的、无约束的运行.方法 RunConstrained 显示了我的解决方案,这不是很优雅.理想情况下,我想通过简单地将专用 Rx 运算符应用于 Monad 来减轻对并发的限制.当然,在不牺牲性能的情况下.

The method RunUnconstrained demonstrates the naive, unconstrained run. The method RunConstrained shows my solution, which is not very elegant. Ideally, I would like to ease constraining the concurrency by simply applying a dedicated Rx operator to the Monad. Of course, without sacrificing the performance.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace RxConstrainedConcurrency
{
    class Program
    {
        public class Stats
        {
            public int MaxConcurrentCount;
            public int CurConcurrentCount;
            public readonly object MaxConcurrentCountGuard = new object();
        }

        static void Main()
        {
            RunUnconstrained().GetAwaiter().GetResult();
            RunConstrained().GetAwaiter().GetResult();
        }
        static async Task RunUnconstrained()
        {
            await Run(AsyncOp);
        }
        static async Task RunConstrained()
        {
            using (var sem = new SemaphoreSlim(10))
            {
                await Run(async (s, pause, stats) =>
                {
                    // ReSharper disable AccessToDisposedClosure
                    await sem.WaitAsync();
                    try
                    {
                        return await AsyncOp(s, pause, stats);
                    }
                    finally
                    {
                        sem.Release();
                    }
                    // ReSharper restore AccessToDisposedClosure
                });
            }
        }
        static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
        {
            var stats = new Stats();
            var rnd = new Random(0x1234);
            var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
            Debug.Assert(stats.CurConcurrentCount == 0);
            Debug.Assert(result.Count == 1000);
            Debug.Assert(!result.Contains(0));
            Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
        }

        static IObservable<string> GetSource(int count)
        {
            return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
        }

        static Task<int> AsyncOp(string s, int pause, Stats stats)
        {
            return Task.Run(() =>
            {
                int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
                if (stats.MaxConcurrentCount < cur)
                {
                    lock (stats.MaxConcurrentCountGuard)
                    {
                        if (stats.MaxConcurrentCount < cur)
                        {
                            stats.MaxConcurrentCount = cur;
                        }
                    }
                }

                try
                {
                    Thread.Sleep(pause);
                    return int.Parse(s);
                }
                finally
                {
                    Interlocked.Decrement(ref stats.CurConcurrentCount);
                }
            });
        }
    }
}

推荐答案

您可以在 Rx 中使用 Merge 的重载来执行此操作,该重载限制对内部 observable 的并发订阅数.

You can do this in Rx using the overload of Merge that constrains the number of concurrent subscriptions to inner observables.

这种形式的Merge适用于一个流.

This form of Merge is applied to a stream of streams.

通常,使用 SelectMany 从事件中调用异步任务有两项工作:将每个事件投影到一个可观察的流中,其单个事件是结果,并将所有结果流展平在一起.

Ordinarily, using SelectMany to invoke an async task from an event does two jobs: it projects each event into an observable stream whose single event is the result, and it flattens all the resulting streams together.

要使用 Merge,我们必须使用常规的 Select 将每个事件投影到异步任务的调用中,(从而创建一个流),并使用 Merge 将结果展平.它将通过在任何时间点仅订阅提供的固定数量的内部流来以受限方式执行此操作.

To use Merge we must use a regular Select to project each event into the invocation of an async task, (thus creating a stream of streams), and use Merge to flatten the result. It will do this in a constrained way by only subscribing to a supplied fixed number of the inner streams at any point in time.

我们必须小心,只在订阅包装内部流时调用每个异步任务调用.使用 ToObservable() 将异步任务转换为 observable 实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须推迟评估,直到使用 订阅>Observable.Defer.

We must be careful to only invoke each asynchronous task invocation upon subscription to the wrapping inner stream. Conversion of an async task to an observable with ToObservable() will actually call the async task immediately, rather than on subscription, so we must defer the evaluation until subscription using Observable.Defer.

以下是将所有这些步骤放在一起的示例:

Here's an example putting all these steps together:

void Main()
{
    var xs = Observable.Range(0, 10); // source events

    // "Double" here is our async operation to be constrained,
    // in this case to 3 concurrent invocations

    xs.Select(x =>
       Observable.Defer(() => Double(x).ToObservable())).Merge(3)
      .Subscribe(Console.WriteLine,
                 () => Console.WriteLine("Max: " + MaxConcurrent));


}

private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();

public async Task<int> Double(int x)
{
    var concurrent = Interlocked.Increment(ref Concurrent);
    lock(gate)
    {
        MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
    }

    await Task.Delay(TimeSpan.FromSeconds(1));

    Interlocked.Decrement(ref Concurrent);

    return x * 2;
}

这里的最大并发输出将为3".删除合并以不受约束",您将得到10".

The maximum concurrency output here will be "3". Remove the Merge to go "unconstrained" and you'll get "10" instead.

获得 Defer 效果的另一种(等效)方式读取更好一点是使用 FromAsync 而不是 Defer + ToObservable:

Another (equivalent) way of getting the Defer effect that reads a bit nicer is to use FromAsync instead of Defer + ToObservable:

xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)

这篇关于如何在 Rx.NET 中以正确的方式约束并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆