限制通过并行任务库运行的活动任务数的最佳方法 [英] Best way to limit the number of active Tasks running via the Parallel Task Library

查看:110
本文介绍了限制通过并行任务库运行的活动任务数的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑队列抱着需要处理作业的很多。队列的限制是只能得到一次1的工作,无法知道有多少就业机会也有办法。这些作业需要10秒才能完成,涉及的等待Web服务响应的很多,所以是不是CPU限制。

如果我用这样的事情

 ,而(真)
{
   VAR工作= Queue.PopJob();
   如果(作业== NULL)
      打破;
   Task.Factory.StartNew(job.Execute);
}

然后,它会从队列速度远远超过它能够完成他们疯狂地流行的工作,耗尽内存,并落在它的屁股。 ><

我不能使用(我不认为)<一个href=\"http://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.aspx\">ParallelOptions.MaxDegreeOfParallelism因为我不能用Parallel.Invoke或Parallel.ForEach

3的替代品,我发现


  1. 替换为Task.Factory.StartNew

     工作任务=新建任务(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();

    这似乎在一定程度解决了这个问题,但我不是<一个href=\"http://stackoverflow.com/questions/3488381/does-the-task-parallel-library-or-plinq-take-other-processes-into-account\">clear究竟这是什么做的,如果​​这是最好的方法。


  2. 创建自定义任务调度器限制并发


  3. 使用类似 BlockingCollection 来的工作添加到收藏启动时,取出时精加工成限制可运行数目


使用#1我不得不相信,正确的决策是自动进行的,2/3号我得上班了,可以运行自己的任务的最大数量。

有我正确地理解这一点 - 这是更好的办法,或者有另一种方式

修改 - 这是我想出从下面的答案,生产者 - 消费者模式

除了总吞吐量目的不是快出列作业比可以被处理,而不是有多个线程轮询队列(此处未示出,但这就是一个非阻塞运算,如果在从高频轮询将导致巨大的交易成本多个地方)。

  // BlockingCollection&LT;&GT;如果尝试添加不止1个任务队列(无(1)将阻止
在贪婪//点!),或者是拿空。
VAR BlockingCollection&LT;人才与GT;工作=新BlockingCollection&LT;人才与GT;(1);//设置了许多消费者的线程。
//确定MAX_CONSUMER_THREADS经验,如果4核CPU和时间的50%
//作业中被阻塞,等待IO则有可能是8。
对(INT numConsumers = 0; numConsumers&下; MAX_CONSUMER_THREADS; numConsumers ++)
{
   螺纹消费=新的Thread(()=&GT;
   {
      而(!jobs.IsCompleted)
      {
         VAR工作= jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}//生产者采取队列的项目,并把阻塞的收集准备处理
而(真)
{
    VAR工作= Queue.PopJob();
    如果(工作!= NULL)
       jobs.Add(工作);
    其他
    {
       jobs.CompletedAdding()
       //可能需要等待正在运行的作业完成
       打破;
    }
}


解决方案

我只是给了一个<一个href=\"http://stackoverflow.com/questions/11099810/managing-the-tpl-queue/11100423#11100423\">answer这是非常适用于这个问题。

基本上,TPL Task类是由调度CPU绑定的工作。它不阻止工作取得。

等待服务的答复:

您用的资源,是不是CPU的工作。这意味着,因为它假定CPU有界性在一定程度上会TPL你mismange资源。

自己管理的资源:启动线程或LongRunning任务的固定数量(这是基本相同的)。决定凭经验线程数

你不能把不可靠的系统投入生产。出于这个原因,我推荐#1,但的节制的。因为有工作项目不创造尽可能多的线程。创建需要哪些饱和远程服务为多个线程。自己写的这滋生N个线程,并用它们来处理M工作项目的辅助函数。你得到完全predictable,结果可靠的方式。

Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.

If I use something like this

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<

I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach

3 alternatives I've found

  1. Replace Task.Factory.StartNew with

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.

  2. Create a custom task scheduler that limits the degree of concurrency

  3. Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.

With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.

Have I understood this correctly - which is the better way, or is there another way?

EDIT - This is what I've come up with from the answers below, producer-consumer pattern.

As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}

解决方案

I just gave an answer which is very applicable to this question.

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.

这篇关于限制通过并行任务库运行的活动任务数的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆