与工作线程去队列中的项目 [英] De-queue Items with worker threads

查看:196
本文介绍了与工作线程去队列中的项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在试图找出如何解决一个要求,我,但对我的生活我只是不能拿出一个解决方案。

I have been trying to figure out how to solve an requirement I have but for the life of me I just can't come up with a solution.

我有一个项目数据库,其中存储了一种队列。
(数据库已经实施,其他进程将项目添加到这个队列中。)

I have a database of items which stores them a kind of queue. (The database has already been implemented and other processes will be adding items to this queue.)

该项目需要大量的工作/时间的过程所以我需要能够:
从数据库中不断去队列项目。
。对于每个项目上运行一个新的线程和进程的项目,然后返回true / false时它已成功处理。 (这将被用来将其重新添加到数据库队列或无法)

The items require a lot of work/time to "process" so I need to be able to: Constantly de-queue items from the database. For each item run a new thread and process the item and then return true/false it it was successfully processed. (this will be used to re-add it to the database queue or not)

但只做到这一点而被处理的活动线程(每个项目之一,目前一些)为小于线程参数的最大数目。

But to only do this while the current number of active threads (one per item being processed) is less then a maximum number of threads parameter.

在线程的最大数量已达到我需要停止去排队从数据库项,直到当前数线程是小于的最大线程数。
在这一点上它需要继续去排队的项目。

Once the maximum number of threads has been reached I need to stop de-queuing items from the database until the current number of threads is less than the maximum number of threads. At which point it needs to continue de-queuing items.

这感觉就像这应该是我能想出,但它只是不来。我

It feels like this should be something I can come up with but it is just not coming to me.

要澄清:我只需要实现线程。该数据库已经实现。

To clarify: I only need to implement the threading. The database has already be implemented.

推荐答案

一个非常简单的方法做,这是一个信号灯。你有一个线程队列中取出项目,并创建线程来处理它们。例如:

One really easy way to do this is with a Semaphore. You have one thread that dequeues items and creates threads to process them. For example:

const int MaxThreads = 4;
Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
while (Queue.HasItems())
{
    sem.WaitOne();
    var item = Queue.Dequeue();
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below
}
// When the queue is empty, you have to wait for all processing
// threads to complete.
// If you can acquire the semaphore MaxThreads times, all workers are done
int count = 0;
while (count < MaxThreads)
{
    sem.WaitOne();
    ++count;
}

// the code to process an item
void ProcessItem(object item)
{
    // cast the item to whatever type you need,
    // and process it.
    // when done processing, release the semaphore
    sem.Release();
}



以上技术工作得很好。这是简单的代码,很容易理解,也很有效。

The above technique works quite well. It's simple to code, easy to understand, and very effective.

一个变化是,你可能要使用工作 API,而 Threadpool.QueueUserWorkItem 工作为您提供了异步处理更多的控制,包括取消。我用 QueueUserWorkItem 在我的例子,因为我更熟悉它。我会在生产程序中使用工作

One change is that you might want to use the Task API rather Threadpool.QueueUserWorkItem. Task gives you more control over the asynchronous processing, including cancellation. I used QueueUserWorkItem in my example because I'm more familiar with it. I would use Task in a production program.

虽然这并用N + 1个线程(其中N是项目你想同时处理)的数量,即额外的线程是不是经常做任何事情。它运行的唯一情况是当它的分配工作到工作线程。否则,它做的旗语非忙等待。

Although this does use N+1 threads (where N is the number of items you want processed concurrently), that extra thread isn't often doing anything. The only time it's running is when it's assigning work to worker threads. Otherwise, it's doing a non-busy wait on the semaphore.

这篇关于与工作线程去队列中的项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆