生产者/消费者工作队列 [英] producer/consumer work queues

查看:182
本文介绍了生产者/消费者工作队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用最好的方式来实现我的处理管道。

I'm wrestling with the best way to implement my processing pipeline.

我的生产者将工作送到BlockingQueue。在消费者方面,我轮询队列,包装我在一个Runnable任务中获得的,并提交到一个ExecutorService。

My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit it to an ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

这不太理想; ExecutorService有自己的队列,当然,真正发生的是,我总是完全耗尽我的工作队列和填充任务队列,随着任务完成缓慢清空。

This is not ideal; the ExecutorService has its own queue, of course, so what's really happening is that I'm always fully draining my work queue and filling the task queue, which slowly empties as the tasks complete.

我意识到我可以在生产者端排队任务,但我真的不想这样做 - 我喜欢我的工作队列的间接/隔离是哑串;它真的不是生产商的任何业务会发生在他们身上。强制生产者排队一个Runnable或Callable打破一个抽象,IMHO。

I realize that I could queue tasks at the producer end, but I'd really rather not do that - I like the indirection/isolation of my work queue being dumb strings; it really isn't any business of the producer what's going to happen to them. Forcing the producer to queue a Runnable or Callable breaks an abstraction, IMHO.

但我想要共享工作队列来表示当前的处理状态。

But I do want the shared work queue to represent the current processing state. I want to be able to block the producers if the consumers aren't keeping up.

我想要使用Executors,但是我觉得我打击他们的设计。我可以喝一部分的Kool-ade,还是我必须喝?我在拒绝排队任务方面错了吗? (我怀疑我可以设置ThreadPoolExecutor使用一个1任务队列,并重写它的执行方法来阻止而不是队列充满拒绝,但感觉毛。)

I'd love to use Executors, but I feel like I'm fighting their design. Can I partially drink the Kool-ade, or do I have to gulp it? Am I being wrong-headed in resisting queueing tasks? (I suspect I could set up ThreadPoolExecutor to use a 1-task queue and override it's execute method to block rather than reject-on-queue-full, but that feels gross.)

建议?

推荐答案


我想要共享工作队列
表示当前处理
状态。

I want the shared work queue to represent the current processing state.

尝试使用共享 BlockingQueue ,并且有一个Worker线程将工作项从队列中移除。

Try using a shared BlockingQueue and have a pool of Worker threads taking work items off of the Queue.


如果消费者不是
,我想能够阻止
生产者。

I want to be able to block the producers if the consumers aren't keeping up.

两者 ArrayBlockingQueue LinkedBlockingQueue 支持有界队列,以便它们在完全阻塞。使用屏蔽 put ()方法确保如果队列已满,生产者将被阻塞。

Both ArrayBlockingQueue and LinkedBlockingQueue support bounded queues such that they will block on put when full. Using the blocking put() methods ensures that producers are blocked if the queue is full.

这是一个粗略的开始。您可以调整工作数量和队列大小:

Here is a rough start. You can tune the number of workers and queue size:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

这篇关于生产者/消费者工作队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆