WhenAll关于大量Task [英] WhenAll on the large number of Task

查看:43
本文介绍了WhenAll关于大量Task的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要您的帮助,以找到最佳的解决方案.这是我的原始代码:

I need your help to find the best solution. This is my original code:

public async Task Test()
{
    var tasks = new List<Task>();
    string line;
    using (var streamReader = File.OpenText(InputPath))
    {
        while ((line = streamReader.ReadLine()) != null)
        {
            tasks.Add(Process(line));
        }
    }

    await Task.WhenAll(tasks.ToArray());
}

private Task Process(string line)
{
    return Task.Run(() =>
    {
        Console.WriteLine(line);
    });
}

它将读取包含行的文件,并通过任务处理每一行.但是,如果文件有100万行以上,则任务数组更大,此代码还好吗?或者我应该找到另一种解决方案.请帮我.谢谢.

It will read a file with lines and process each line by a task. However, if file has more 1 million lines, the array of tasks are bigger, this code is still good? or I should find another solution. Please help me. Thanks.

推荐答案

那是个坏主意.那可能会启动太多线程.

That's a bad idea. That could launch way too many threads.

更好的方法是像这样简单地使用 Parallel.ForEach():

A much better way to do this is to simply use Parallel.ForEach() like so:

using System;
using System.IO;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            string filename = @"Your test filename goes here";
            Parallel.ForEach(File.ReadLines(filename), process);
        }

        private static void process(string line)
        {
            Console.WriteLine(line);
        }
    }
}

但是,这不使用异步/等待.但是,如果需要,您可以将对 Parallel.ForEach()的整个调用包装在一个任务中.

This doesn't use async/await, however. But you could wrap the entire call to Parallel.ForEach() in a task if you wanted.

或者,如果您想使用任务并行库(Microsoft NuGet程序包),您可以执行以下操作:

Alternatively, if you want to use the Task Parallel Library (a Microsoft NuGet package) you can do something like this:

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            Task.Run(test).Wait();
        }

        static async Task test()
        {
            string filename = @"Your filename goes here";
            await processFile(filename);
        }

        static async Task processFile(string filename)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
            var action = new ActionBlock<string>(s => process(s), options);

            foreach (var line in File.ReadLines(filename))
                await action.SendAsync(line);

            action.Complete();

            await action.Completion;
        }

        static void process(string line)
        {
            Thread.Sleep(100);  // Simulate work.
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + line);
        }
    }
}

这为您提供 async 支持.

附录:线程池限制的演示.

Addendum: A demonstration of threadpool throttling.

(这是对shay__的评论的回应.)

(This is in response to shay__'s comments.)

如果您启动许多长时间运行的任务,而这些任务花费的运行时间超过一秒钟左右,则可能会看到线程池节流.

If you start a lot of long-running tasks where the task takes longer to run than a second or so, you may see threadpool throttling.

如果当前进程的线程池线程数等于或超过通过调用 ThreadPool.GetMinThreads(out worker,out ports); 返回的 worker 计数,则会发生这种情况.>.

This happens if the number of threadpool threads for the current process equals or exceeds the worker count returned by a call to ThreadPool.GetMinThreads(out workers, out ports);.

如果发生这种情况,在创建新的线程池线程之前,新线程池线程的启动将延迟一会(在我的系统上为一秒钟).通常,这将允许另一个线程池线程变得可用,而将使用它(当然,这是主要原因节流).

If this happens, the launching of a new threadpool thread will be delayed by a short while (one second on my system) before a new threadpool thread is created. Often that will have allowed another threadpool thread to become available, and that will be used instead (which of course is a major reason for the throttling).

以下代码演示了该问题:

The following code demonstrates the issue:

int workers, ports;
ThreadPool.GetMinThreads(out workers, out ports);
Console.WriteLine("Min workers = " + workers); // Prints 8 on my system.
var sw = Stopwatch.StartNew();

for (int i = 0; i < 100; ++i)
{
    Task.Run(() =>
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} started at time {sw.Elapsed}");
        Thread.Sleep(10000);
    });
}

Console.ReadLine();

在我的系统上,这将打印以下内容:

On my system, this prints the following:

Min workers = 8
Thread 3 started at time 00:00:00.0098651
Thread 6 started at time 00:00:00.0098651
Thread 8 started at time 00:00:00.0099841
Thread 5 started at time 00:00:00.0099680
Thread 7 started at time 00:00:00.0099918
Thread 4 started at time 00:00:00.0098739
Thread 10 started at time 00:00:00.0100828
Thread 9 started at time 00:00:00.0101833
Thread 11 started at time 00:00:01.0096247
Thread 12 started at time 00:00:02.0098105
Thread 13 started at time 00:00:03.0099824
Thread 14 started at time 00:00:04.0100671
Thread 15 started at time 00:00:05.0098035
Thread 16 started at time 00:00:06.0099449
Thread 17 started at time 00:00:07.0096293
Thread 18 started at time 00:00:08.0106774
Thread 19 started at time 00:00:09.0098193
Thread 20 started at time 00:00:10.0104156
Thread 3 started at time 00:00:10.0109315
Thread 8 started at time 00:00:10.0112171
Thread 7 started at time 00:00:10.0112531
Thread 9 started at time 00:00:10.0117256
Thread 4 started at time 00:00:10.0117920
Thread 10 started at time 00:00:10.0117298
Thread 6 started at time 00:00:10.0109381
Thread 5 started at time 00:00:10.0112276
Thread 21 started at time 00:00:11.0095859
Thread 11 started at time 00:00:11.0101189
Thread 22 started at time 00:00:12.0095421
Thread 12 started at time 00:00:12.0111173
Thread 23 started at time 00:00:13.0095932    ...

请注意前8个线程是如何快速启动的,但是随后将新线程的速度控制在每秒一个左右,直到第一批线程终止并可以重用为止.

Note how the first 8 threads start very quickly, but then new threads are throttled to around one per second, until the first batch of threads terminate and then can be reused.

还请注意,只有在线程花费相对较长的时间来终止时,这种效果才会发生.

Also note that this effect only occurs if the threads take a relatively long time to terminate.

这篇关于WhenAll关于大量Task的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆