有效地使用数千个任务进行超时 [英] Using thousands of Tasks with a timeout efficiently

查看:95
本文介绍了有效地使用数千个任务进行超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现一个通过套接字与另一个应用程序A进行通信的库L.

基本工作流程如下:

  1. L连接到A.
  2. L向A发送了大约50,000条信息I 为每个发出的I创建一个任务T.
  3. L侦听来自A的传入结果,一旦重用,就使用a TaskCompletionSource设置任务T的结果
  4. L创建一个具有设置的超时时间(Task.WhenAny(T,Task.Delay(xx))的任务T2
  5. L使用Task.WhenAll(T2)等待超时或所有已发送信息的结果.

管理基础数据结构完全没有问题.主要问题是在我的计算机上,组装"main" Task.WhenAll(T2)大约需要5-6秒. 50.000个条目(创建50.000 * 2 + 1个任务).

但是,我想不出一种更轻便的方法来实现相同的目的.它应该使用所有可用的内核并且是非阻塞的,并且还应支持超​​时.

有没有一种方法可以使用Parallel-或ThreadPool类来提高性能?

显示基本设置的代码: https://dotnetfiddle.net/gIq2DP

解决方案

总共启动 n LongRunningTasks,其中 n 是计算机上的内核数.每个任务应在一个核心上运行.为您要发送的每个任务创建5万个新任务将是一种浪费.而是将任务设计为接受 I和套接字信息-将该信息发送到的位置.

创建一个BlockingCollection<Tuple<I, SocketInfo>>.启动一项任务以填充此阻止集合.您先前创建的其他n个长期运行的任务可以继续获取元组信息和地址以发送信息,然后以循环的形式为您执行作业,该循环在完成阻止收集时会中断.

可以在长时间运行的任务本身中设置超时.

这整个设置将使您的CPU忙于有用的工作,而不是不必要地忙于创建 50K任务的工作". >

由于发生在主存储器之外的操作(如此网络操作)是

I am implementing a Library L that communicates via Sockets with another application A.

Basic workflow is as followed:

  1. L connects to A.
  2. L sends ~50.000 pieces of information I to A, and creates a task T for every I that is sent out.
  3. L listens for incoming results from A, and once reuslts are there, uses a TaskCompletionSource to set the results of the Tasks T
  4. L creates a Task T2 with a set Timeout (Task.WhenAny(T,Task.Delay(xx))
  5. L uses Task.WhenAll(T2) to wait for timeout or results on all sent information.

Managing the underlying data structure is no problem at all. The main problem is that assembling the "main" Task.WhenAll(T2) costs around 5-6 seconds on my computer with ca. 50.000 entries (creating 50.000*2+1 tasks).

I can't think off a more lightweight way that accomplishes the same, however. It should use all Cores available and be non-blocking, and support timeouts aswell.

Is there a way to accomplish the same using the Parallel- or ThreadPool classes which enhances the performance?

EDIT: Code showing how the basic setup is: https://dotnetfiddle.net/gIq2DP

解决方案

Start a total of n LongRunningTasks, where n is the number of cores on your machine. Each task should run on one core. It would be a waste to create 50K new tasks for every I that you want to send. Instead design the tasks to accept I and the socket information - where this information is to be sent.

Create a BlockingCollection<Tuple<I, SocketInfo>>. Start one task to populate this blocking collection. The other n long running tasks that you created earlier can keep taking tuples of information and the address to send the information and then perform the job for you in a loop that will break when blocking collection is done.

Timeouts can be set in the long running tasks itself.

This entire setup will keep your CPU busy to the maximum with useful work rather than keeping it needlessly busy with a "job" of 50K tasks' creation.

Since the operations (like this network operation) which happen beyond the main memory are very very slow for the CPU, feel free to set n not just equal to number of cores in your machine but even thrice that value. In my code demonstration I have set it equal to the number of cores only.

With the code at the provided link, this is one way...

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;

namespace TestConsoleApplication
{
    public static class Test
    {
        public static void Main()
        {
            TaskRunningTest();
        }

        private static void TaskRunningTest()
        {
            var s = new Stopwatch();
            const int totalInformationChunks = 50000;
            var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var tcs = new TaskCompletionSource<int>();

            var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);

            s.Start();
            //Start a new task to populate the "itemsToProcess"
            taskFactory.StartNew(() =>
            {
                // Add Tuples of Information and Address to which this information is to be sent to.
                Console.WriteLine("Done intializing all the jobs...");
                // Finally signal that you are done by saying..
                itemsToProcess.CompleteAdding();
            });

            //Initializing the base tasks
            for (var index = 0; index < baseProcessorTaskArray.Length; index++)
            {
                var thisIndex = index;
                baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
                {
                    while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
                    {
                        Tuple<Information, Address> item;
                        itemsToProcess.TryTake(out item);
                        //Process the item
                        tcs.TrySetResult(thisIndex);
                    }
                });
            }

            // Need to provide new timeout logic now
            // Depending upon what you are trying to achieve with timeout, you can devise out the way

            // Wait for the base tasks to completely empty OR
            // timeout and then stop the stopwatch.
            Task.WaitAll(baseProcessorTaskArray); 
            s.Stop();
            Console.WriteLine(s.ElapsedMilliseconds);
        }

        private class Address
        {
            //This class should have the socket information
        }

        private class Information
        {
            //This class will have the Information to send
        }
    }
}

这篇关于有效地使用数千个任务进行超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆