堵收集过程n项的时间 - 只要继续担任1完成 [英] blocking collection process n items at a time - continuing as soon as 1 is done

查看:246
本文介绍了堵收集过程n项的时间 - 只要继续担任1完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下情形。

  1. 我从数据库50个作业进入阻塞集合。

  2. 每个工作是一个长期运行的。 (潜在的可能)。所以我想在一个单独的线程中运行它们。 (我知道 - 这可能是更好地运行它们的Task.WhenAll,让第三方物流的数字出来 - 但我想控制多少个同时运行)

  3. 说我要同时运行5人(配置)

  4. 我创建了5个任务(TPL),一个为每个作业和并行运行它们。

我想要做的是从第4步完成后尽快拿起阻塞集合的下一个工作的工作之一,并继续下去,直到所有50个完成了。

我想创建一个静态blockingCollection和TaskCompletionSource当作业完成后,将被调用,然后它可以再次致电消费者挑一的工作在队列中的时间。我也想叫异步/等待每个工作 - 但是这是在此之上 - 不知道这对方法产生影响。

这是正确的方式来完成我想要做什么?

类似于<一href="http://stackoverflow.com/questions/6946902/processing-only-n-items-at-a-time-concurrently-using-task-parallel-library">this链接,但美中不足的是,我要尽快的前N个项目之一,完成​​处理下一个作业。毕竟不是N的完成。

更新:

好吧,我有这个code段做正是我想要的,如果有人想以后使用它。正如你可以看到下面,5个线程创建和每个线程开始下一个工作时,它与目前的完成。仅5线程是活动的,在任何给定的时间。我理解这可能无法正常工作100%,这样总的,如果使用一个CPU /核心将有上下文切换的性能问题。

  VAR块=新ActionBlock&LT;工作&GT;(
                工作=&GT; Handler.HandleJob(工作),
                    新ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

              的foreach(在GetJobs工作j())
                  block.SendAsync(J);
 

  

工作2开始螺纹:13。等待时间:3600000ms。时间:2014年8月29日   下午3时14分43秒

     

工作4开始螺纹:14。等待时间:15000ms。时间:2014年8月29日   下午3时14分43秒

     

作业0开始螺纹:7。等待时间:600000ms。时间:2014年8月29日   下午3时14分43秒

     

招聘1日开始执行的线程:12。等待时间:900000ms。时间:2014年8月29日   下午3时14分43秒

     

工作3开始螺纹:11。等待时间:120000ms。时间:2014年8月29日   下午3时14分43秒

     

工作4完成对螺纹:14。 2014年8月29日下午3时14分58秒

     

工作5开始螺纹:14。等待时间:1800000ms。时间:2014年8月29日   下午三点14分58秒

     

作业3完成对螺纹:11。 2014年8月29日下午三点十六分43秒

     

工作6开始螺纹:11。等待时间:1200000ms。时间:2014年8月29日   下午3时16分43秒

     

作业0完成上线:7。 2014年8月29日下午3点24分43秒

     

工作7开始螺纹:7。等待时间:30000ms。时间:2014年8月29日3时24分43秒   PM

     

作业7完成对螺纹:7。 2014年8月29日下午3点25分13秒

     

工作8日开始上线:7。等待时间:100000ms。时间:2014年8月29日   下午3时25分十三秒

     

工作8完成上线:7。 2014年8月29日下午3时26分53秒

     

工作9开始螺纹:7。等待时间:900000ms。时间:2014年8月29日   下午3时26分53秒

     

任务1完成对螺纹:12。 2014年8月29日下午3时29分43秒

     

工作10开始螺纹:12。等待时间:300000ms。时间:2014年8月29日   下午3时29分43秒

     

工作10完成对螺纹:12。 2014年8月29日下午3时34分43秒

     

工作11开始螺纹:12。等待时间:600000ms。时间:2014年8月29日   下午三时34分43秒

     

工作6完成对螺纹:11。 2014年8月29日下午3时36分43秒

     

12号作业开始螺纹:11。等待时间:300000ms。时间:2014年8月29日   下午3点36分43秒

     

12号作业完成上线:11。 2014年8月29日下午3时41分43秒

     

工作13日开始执行的线程:11。等待时间:100000ms。时间:2014年8月29日   下午三时41分43秒

     

工作9完成对螺纹:7。 2014年8月29日下午3点41分53秒

     

工作14开始螺纹:7。等待时间:300000ms。时间:2014年8月29日   下午3时41分53秒

     

工作13完成对螺纹:11。 2014年8月29日下午3时43分23秒

     

工作11完成对螺纹:12。 2014年8月29日下午3时44分43秒

     

5的工作完了就螺纹:14。 2014年8月29日下午3时44分58秒

     

工作14完成对螺纹:7。 2014年8月29日下午三时46分53秒

     

2的工作完了就螺纹:13。 2014年8月29日下午4点14分43秒

解决方案

您可以轻松地,你需要使用的 TPL数据流

你可以做的是使用 BufferBlock&LT; T&GT; ,这是一个用于存储你的数据的缓冲区,并用的 ActionBlock&LT; T&GT; 这将消耗这些请求,因为他们再从未来的 BufferBlock&LT; T&GT;

现在,这里的美妙之处在于,你可以指定你要多少请求 ActionBlock&LT; T&GT; 来使用并行处理的<一个href="http://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.executiondataflowblockoptions(v=vs.110).aspx"相对=nofollow> ExecutionDataflowBlockOptions 类。

下面是一个简单的控制台版本,其处理一堆数字,因为它们可以进来,打印他们的名字和 Thread.ManagedThreadID

 私有静态无效的主要(字串[] args)
{
    VAR bufferBlock =新BufferBlock&LT;诠释&GT;();

    VAR actionBlock =
        新ActionBlock&LT; INT&GT;(I =&GT; Console.WriteLine(读数{0}在线程{1},
                                  我,Thread.CurrentThread.ManagedThreadId)
                             新ExecutionDataflowBlockOptions
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    生产(bufferBlock);

    Console.ReadKey();
}

私有静态无效农产品(BufferBlock&LT; INT&GT; bufferBlock)
{
    的foreach(在Enumerable.Range变种NUM(0,500))
    {
        bufferBlock.Post(NUM);
    }
}
 

您也可以在需要时张贴异步,使用awaitable的 BufferBlock.SendAsync

这样的话,你让太平人寿处理所有的限制你,而无需做手工。

I have the following Scenario.

  1. I take 50 jobs from the database into a blocking collection.

  2. Each job is a long running one. (potentially could be). So I want to run them in a separate thread. (I know - it may be better to run them as Task.WhenAll and let the TPL figure it out - but I want to control how many runs simultaneously)

  3. Say I want to run 5 of them simultaneously (configurable)

  4. I create 5 tasks (TPL), one for each job and run them in parallel.

What I want to do is to pick up the next Job in the blocking collection as soon as one of the jobs from step 4 is complete and keep going until all 50 are done.

I am thinking of creating a Static blockingCollection and a TaskCompletionSource which will be invoked when a job is complete and then it can call the consumer again to pick one job at a time from the queue. I would also like to call async/await on each job - but that's on top of this - not sure if that has an impact on the approach.

Is this the right way to accomplish what I'm trying to do?

Similar to this link, but catch is that I want to process the next Job as soon as one of the first N items are done. Not after all N are done.

Update :

Ok, I have this code snippet doing exactly what I want, if someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is done with current. Only 5 threads are active at any given time. I understand this may not work 100% like this always, and will have performance issues of context switching if used with one cpu/core.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

Job 2 started on thread :13. wait time:3600000ms. Time:8/29/2014 3:14:43 PM

Job 4 started on thread :14. wait time:15000ms. Time:8/29/2014 3:14:43 PM

Job 0 started on thread :7. wait time:600000ms. Time:8/29/2014 3:14:43 PM

Job 1 started on thread :12. wait time:900000ms. Time:8/29/2014 3:14:43 PM

Job 3 started on thread :11. wait time:120000ms. Time:8/29/2014 3:14:43 PM

job 4 finished on thread :14. 8/29/2014 3:14:58 PM

Job 5 started on thread :14. wait time:1800000ms. Time:8/29/2014 3:14:58 PM

job 3 finished on thread :11. 8/29/2014 3:16:43 PM

Job 6 started on thread :11. wait time:1200000ms. Time:8/29/2014 3:16:43 PM

job 0 finished on thread :7. 8/29/2014 3:24:43 PM

Job 7 started on thread :7. wait time:30000ms. Time:8/29/2014 3:24:43 PM

job 7 finished on thread :7. 8/29/2014 3:25:13 PM

Job 8 started on thread :7. wait time:100000ms. Time:8/29/2014 3:25:13 PM

job 8 finished on thread :7. 8/29/2014 3:26:53 PM

Job 9 started on thread :7. wait time:900000ms. Time:8/29/2014 3:26:53 PM

job 1 finished on thread :12. 8/29/2014 3:29:43 PM

Job 10 started on thread :12. wait time:300000ms. Time:8/29/2014 3:29:43 PM

job 10 finished on thread :12. 8/29/2014 3:34:43 PM

Job 11 started on thread :12. wait time:600000ms. Time:8/29/2014 3:34:43 PM

job 6 finished on thread :11. 8/29/2014 3:36:43 PM

Job 12 started on thread :11. wait time:300000ms. Time:8/29/2014 3:36:43 PM

job 12 finished on thread :11. 8/29/2014 3:41:43 PM

Job 13 started on thread :11. wait time:100000ms. Time:8/29/2014 3:41:43 PM

job 9 finished on thread :7. 8/29/2014 3:41:53 PM

Job 14 started on thread :7. wait time:300000ms. Time:8/29/2014 3:41:53 PM

job 13 finished on thread :11. 8/29/2014 3:43:23 PM

job 11 finished on thread :12. 8/29/2014 3:44:43 PM

job 5 finished on thread :14. 8/29/2014 3:44:58 PM

job 14 finished on thread :7. 8/29/2014 3:46:53 PM

job 2 finished on thread :13. 8/29/2014 4:14:43 PM

解决方案

You can easily achieve what you need using TPL Dataflow.

What you can do is use BufferBlock<T>, which is a buffer for storing you data, and link it together with an ActionBlock<T> which will consume those requests as they're coming in from the BufferBlock<T>.

Now, the beauty here is that you can specify how many requests you want the ActionBlock<T> to handle concurrently using the ExecutionDataflowBlockOptions class.

Here's a simplified console version, which processes a bunch of numbers as they're coming in, prints their name and Thread.ManagedThreadID:

private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

You can also post them asynchronously if needed, using the awaitable BufferBlock.SendAsync

That way, you let the TPL handle all the throttling for you without needing to do it manually.

这篇关于堵收集过程n项的时间 - 只要继续担任1完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆