如何限制 C# 中的最大并行任务数 [英] How to limit the Maximum number of parallel tasks in c#

查看:16
本文介绍了如何限制 C# 中的最大并行任务数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要处理 1000 条输入消息的集合.我正在循环输入集合并开始处理每条消息的新任务.

I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

我们能猜出当时最多同时处理多少条消息吗(假设是普通的四核处理器),还是可以限制一次处理的最大消息数?

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

如何确保以与集合相同的顺序/顺序处理此消息?

How to ensure this message get processed in the same sequence/order of the Collection?

推荐答案

在这种情况下,SemaphoreSlim 是一个非常好的解决方案,我强烈推荐 OP 尝试此操作,但 @Manoj 的回答有评论中提到的缺陷.应该等待信号量在生成这样的任务之前.

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

更新答案: 正如@Vasyl 指出的那样,信号量可能会在任务完成之前被释放,并且会在退出 using 块之前调用 Release() 方法时引发异常必须等待所有创建的任务完成.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}

回复评论对于那些想看看如何在没有 Task.WaitAll 的情况下处理信号量的人在控制台应用程序中运行以下代码,将引发此异常.

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.

System.ObjectDisposedException: '信号量已被释放.'

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}

这篇关于如何限制 C# 中的最大并行任务数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆