在后台运行一个长时间运行的并行任务,同时允许小的异步任务更新前台 [英] Running a long-running parallel task in the background, while allowing small async tasks to update the foreground

查看:23
本文介绍了在后台运行一个长时间运行的并行任务,同时允许小的异步任务更新前台的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有大约 10 000 000 个任务,每个任务需要 1-10 秒才能完成.我在一个强大的服务器上运行这些任务,使用 50 个不同的线程,每个线程选择第一个未完成的任务,运行它,然后重复.

I have around 10 000 000 tasks that each takes from 1-10 seconds to complete. I am running those tasks on a powerful server, using 50 different threads, where each thread picks the first not-done task, runs it, and repeats.

伪代码:

for i = 0 to 50:
    run a new thread:
        while True:
            task = first available task
            if no available tasks: exit thread
            run task

使用此代码,我可以在任何给定数量的线程上并行运行所有任务.

Using this code, I can run all the tasks in parallell on any given number of threads.

实际上,代码使用 C# 的 Task.WhenAll,如下所示:

In reality, the code uses C#'s Task.WhenAll, and looks like this:

ServicePointManager.DefaultConnectionLimit = threadCount; //Allow more HTTP request simultaneously
var currentIndex = -1;
var threads = new List<Task>(); //List of threads
for (int i = 0; i < threadCount; i++) //Generate the threads
{
    var wc = CreateWebClient();
    threads.Add(Task.Run(() =>
    {
        while (true) //Each thread should loop, picking the first available task, and executing it.
        {
            var index = Interlocked.Increment(ref currentIndex);
            if (index >= tasks.Count) break;
            var task = tasks[index];
            RunTask(conn, wc, task, port);
        }
    }));
}

await Task.WhenAll(threads);

这就像我想要的那样工作,但我有一个问题:由于这段代码需要很多时间来运行,我希望用户看到一些进展.进度以彩色位图显示(代表矩阵),生成也需要一些时间(几秒钟).

This works just as I wanted it to, but I have a problem: since this code takes a lot of time to run, I want the user to see some progress. The progress is displayed in a colored bitmap (representing a matrix), and also takes some time to generate (a few seconds).

因此,我想在后台线程上生成此可视化.但是这个其他后台线程永远不会执行.我怀疑它与并行代码使用相同的线程池,因此被排队,并且在并行代码实际完成之前不会执行.(这有点太晚了.)

Therefore, I want to generate this visualization on a background thread. But this other background thread is never executed. My suspicion is that it is using the same thread pool as the parallel code, and is therefore enqueued, and will not be executed before the parallel code is actually finished. (And that's a bit too late.)

以下是我如何生成进度可视化的示例:

Here's an example of how I generate the progress visualization:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)
{
    var bitmap = await Task.Run(() => // <<< This task is never executed!
    {
        //bla, bla, various database calls, and generating a relatively large bitmap
    });

    //Convert the bitmap into a WPF image, and update the GUI
    VisualizationImage = BitmapToImageSource(bitmap);
}

那么,我怎样才能最好地解决这个问题?我可以创建一个 Task 列表,其中每个 Task 代表我的一个任务,并使用 Parallel.Invoke 运行它们,然后选择另一个线程池(我认为).但随后我必须生成 1000 万个 Task 对象,而不是仅仅 50 个 Task 对象,运行我要做的一系列事情.听起来它使用的 RAM 比必要的多得多.对此有什么巧妙的解决方案吗?

So, how could I best solve this problem? I could create a list of Tasks, where each Task represents one of my tasks, and run them with Parallel.Invoke, and pick another Thread pool (I think). But then I have to generate 10 million Task objects, instead of just 50 Task objects, running through my array of stuff to do. That sounds like it uses much more RAM than necessary. Any clever solutions to this?

正如 Panagiotis Kanavos 在他的一条评论中所建议的那样,我尝试用 ActionBlock 替换我的一些循环逻辑,如下所示:

// Create an ActionBlock<int> that performs some work. 
var workerBlock = new ActionBlock<ZoneTask>(
t =>
{
    var wc = CreateWebClient(); //This probably generates some unnecessary overhead, but that's a problem I can solve later.
    RunTask(conn, wc, t, port);
},
// Specify a maximum degree of parallelism. 
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = threadCount
});

foreach (var t in tasks) //Note: the objects in the tasks array are not Task objects
    workerBlock.Post(t);
workerBlock.Complete();

await workerBlock.Completion;

注意:RunTask 只是使用 WebClient 执行 Web 请求,并解析结果.里面没有任何东西可以造成死锁.

这似乎与旧的并行代码一样工作,只是它需要一两分钟来执行初始 foreach 循环来发布任务.这种延迟真的值得吗?

This seems to work as the old parallelism code, except that it needs a minute or two to do the initial foreach loop to post the tasks. Is this delay really worth it?

尽管如此,我的进度任务似乎仍然被阻止.忽略进度 目前的建议,因为这个简化的代码仍然遇到同样的问题:

Nevertheless, my progress task still seems to be blocked. Ignoring the Progress< T > suggestion for now, since this reduced code still suffers the same problem:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("This happens");
    var bitmap = await Task.Run(() =>
    {
        Debug.WriteLine("This does not!");
        //Still doing some work here, so it's not optimized away.
    };

    VisualizationImage = BitmapToImageSource(bitmap);
}

所以看起来只要并行任务正在运行,新任务就不会执行.我什至将MaxDegreeOfParallelism"从 50 减少到 5(在 24 核服务器上),以查看 Peter Ritchie 的建议是否正确,但没有改变.还有其他建议吗?

So it still looks like new tasks are not executed as long as the parallell task is running. I even reduced the "MaxDegreeOfParallelism" from 50 to 5 (on a 24 core server) to see if Peter Ritchie's suggestion was right, but no change. Any other suggestions?

另一个

问题似乎是我的所有并发阻塞 I/O 调用使线程池过载.我用 HttpClient 及其异步函数替换了 WebClient,现在一切似乎都运行良好.

The issue seems to have been that I overloaded the thread pool with all my simultaneous blocking I/O calls. I replaced WebClient with HttpClient and its async-functions, and now everything seems to be working nicely.

感谢大家的好建议!尽管并非所有人都直接解决了问题,但我相信他们都改进了我的代码.:)

Thanks to everyone for the great suggestions! Even though not all of them directly solved the problem, I'm sure they all improved my code. :)

推荐答案

.NET 已经通过 IProgressProgress 实现.

.NET already provides a mechanism to report progress with the IProgress< T> and the Progress< T> implementation.

IProgress 接口允许客户端使用 Report(T) 类而不必担心线程.该实现确保在适当的线程(例如 UI 线程)中处理消息.通过使用简单的 IProgress<;T> 接口,后台方法与处理消息的人分离.

The IProgress interface allows clients to publish messages with the Report(T) class without having to worry about threading. The implementation ensures that the messages are processed in the appropriate thread, eg the UI thread. By using the simple IProgress< T> interface the background methods are decoupled from whoever processes the messages.

您可以在 4.5 中的异步:在异步 API 中启用进度和取消文章.取消和进度 API 并非特定于 TPL.即使对于原始线程,它们也可用于简化取消和报告.

You can find more information in the Async in 4.5: Enabling Progress and Cancellation in Async APIs article. The cancellation and progress APIs aren't specific to the TPL. They can be used to simplify cancellation and reporting even for raw threads.

进展 在创建它的线程上处理消息.这可以通过在实例化类时传递处理委托或通过订阅事件来完成.从文章复制:

Progress< T> processes messages on the thread on which it was created. This can be done either by passing a processing delegate when the class is instantiated, or by subscribing to an event. Copying from the article:

private async void Start_Button_Click(object sender, RoutedEventArgs e)
{
    //construct Progress<T>, passing ReportProgress as the Action<T> 
    var progressIndicator = new Progress<int>(ReportProgress);
    //call async method
    int uploads=await UploadPicturesAsync(GenerateTestImages(), progressIndicator);
}

其中 ReportProgress 是一个接受 int 参数的方法.它还可以接受一个复杂的类,报告已完成的工作、消息等.

where ReportProgress is a method that accepts a parameter of int. It could also accept a complex class that reported work done, messages etc.

异步方法只需要使用IProgress.Report,例如:

The asynchronous method only has to use IProgress.Report, eg:

async Task<int> UploadPicturesAsync(List<Image> imageList, IProgress<int> progress)
{
        int totalCount = imageList.Count;
        int processCount = await Task.Run<int>(() =>
        {
            int tempCount = 0;
            foreach (var image in imageList)
            {
                //await the processing and uploading logic here
                int processed = await UploadAndProcessAsync(image);
                if (progress != null)
                {
                    progress.Report((tempCount * 100 / totalCount));
                }
                tempCount++;
            }

            return tempCount;
        });
        return processCount;
}

这将后台方法与接收和处理进度消息的人分离.

This decouples the background method from whoever receives and processes the progress messages.

这篇关于在后台运行一个长时间运行的并行任务,同时允许小的异步任务更新前台的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆