如何在C#中限制并行任务的最大数量 [英] How to limit the Maximum number of parallel tasks in c#

查看:801
本文介绍了如何在C#中限制并行任务的最大数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有1000条输入消息要处理。我正在循环输入集合并为要处理的每条消息启动新任务。

I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

我们能猜一次同时处理多少个最大消息(假设正常)四核处理器),还是我们可以限制当时要处理的最大消息数?

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

如何确保以集合的相同顺序/顺序处理此消息?

How to ensure this message get processed in the same sequence/order of the Collection?

推荐答案

在这种情况下,SemaphoreSlim是一个很好的解决方案,我强烈建议OP尝试此操作,但是@Manoj的答案存在注释中提到的缺陷。在生成这样的任务之前,应等待信号量。

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

更新后的答案:正如@Vasyl指出的那样,信号量可能在任务完成之前被处置,并且在 Release()<时会引发异常。 / code>方法被调用,因此在退出using块之前必须等待所有创建的任务完成。

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}

对这些问题的评论
谁想要看看如何在不使用 Task.WaitAll
的情况下处理信号量,请在控制台应用程序中运行以下代码,并将引发此异常。

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.


System.ObjectDisposedException:'信号量已被处置。'

System.ObjectDisposedException: 'The semaphore has been disposed.'



static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}

这篇关于如何在C#中限制并行任务的最大数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆