使用工作线程将项目出队 [英] De-queue Items with worker threads

查看:27
本文介绍了使用工作线程将项目出队的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在想办法解决我的需求,但在我的一生中,我就是想不出一个解决方案.

I have been trying to figure out how to solve an requirement I have but for the life of me I just can't come up with a solution.

我有一个项目数据库,将它们存储为一种队列.(数据库已经实现,其他进程会向这个队列添加项目.)

I have a database of items which stores them a kind of queue. (The database has already been implemented and other processes will be adding items to this queue.)

这些项目需要大量的工作/时间来处理",所以我需要能够:不断地从数据库中取出项目.对于每个项目运行一个新线程并处理该项目,然后返回真/假它已成功处理.(这将用于将其重新添加到数据库队列中)

The items require a lot of work/time to "process" so I need to be able to: Constantly de-queue items from the database. For each item run a new thread and process the item and then return true/false it it was successfully processed. (this will be used to re-add it to the database queue or not)

但仅在当前活动线程数(每个正在处理的项目一个)小于最大线程数参数时执行此操作.

But to only do this while the current number of active threads (one per item being processed) is less then a maximum number of threads parameter.

一旦达到最大线程数,我需要停止从数据库中取出项目,直到当前线程数小于最大线程数.此时它需要继续对项目进行出队.

Once the maximum number of threads has been reached I need to stop de-queuing items from the database until the current number of threads is less than the maximum number of threads. At which point it needs to continue de-queuing items.

感觉这应该是我可以想出的东西,但它只是不来找我.

It feels like this should be something I can come up with but it is just not coming to me.

澄清:我只需要实现线程.数据库已经实现.

To clarify: I only need to implement the threading. The database has already be implemented.

推荐答案

一个非常简单的方法是使用 Semaphore.您有一个线程出列项目并创建线程来处理它们.例如:

One really easy way to do this is with a Semaphore. You have one thread that dequeues items and creates threads to process them. For example:

const int MaxThreads = 4;
Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
while (Queue.HasItems())
{
    sem.WaitOne();
    var item = Queue.Dequeue();
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below
}
// When the queue is empty, you have to wait for all processing
// threads to complete.
// If you can acquire the semaphore MaxThreads times, all workers are done
int count = 0;
while (count < MaxThreads)
{
    sem.WaitOne();
    ++count;
}

// the code to process an item
void ProcessItem(object item)
{
    // cast the item to whatever type you need,
    // and process it.
    // when done processing, release the semaphore
    sem.Release();
}

上述技术效果很好.编码简单、易于理解且非常有效.

The above technique works quite well. It's simple to code, easy to understand, and very effective.

一个变化是您可能想要使用 Task API 而不是 Threadpool.QueueUserWorkItem.Task 使您可以更好地控制异步处理,包括取消.我在示例中使用了 QueueUserWorkItem,因为我更熟悉它.我会在生产程序中使用 Task.

One change is that you might want to use the Task API rather Threadpool.QueueUserWorkItem. Task gives you more control over the asynchronous processing, including cancellation. I used QueueUserWorkItem in my example because I'm more familiar with it. I would use Task in a production program.

尽管这确实使用了 N+1 个线程(其中 N 是您要同时处理的项目数),但该额外线程通常不会执行任何操作.它运行的唯一时间是将工作分配给工作线程.否则,它将在信号量上进行非忙等待.

Although this does use N+1 threads (where N is the number of items you want processed concurrently), that extra thread isn't often doing anything. The only time it's running is when it's assigning work to worker threads. Otherwise, it's doing a non-busy wait on the semaphore.

这篇关于使用工作线程将项目出队的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆