阻塞队列和多线程消费者,如何知道何时停止 [英] Blocking queue and multi-threaded consumer, how to know when to stop

查看:166
本文介绍了阻塞队列和多线程消费者,如何知道何时停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个单线程生成器,它创建了一些任务对象,然后将它们添加到 ArrayBlockingQueue (具有固定大小)。

I have a single thread producer which creates some task objects which are then added into an ArrayBlockingQueue (which is of fixed size).

我还启动了一个多线程消费者。这是一个固定的线程池( Executors.newFixedThreadPool(threadCount); )。然后我向这个threadPool提交了一些ConsumerWorker入口,每个ConsumerWorker都对上面提到的ArrayBlockingQueue实例进行了引用。

I also start a multi-threaded consumer. This is build as a fixed thread pool (Executors.newFixedThreadPool(threadCount);). I then submit some ConsumerWorker intances to this threadPool, each ConsumerWorker having a refference to the above mentioned ArrayBlockingQueue instance.

每个这样的工作人员都会在队列上执行 take()并处理该任务。

Each such Worker will do a take() on the queue and deal with the task.

我的问题是,当没有更多的工作要做时,让工人知道的最佳方法是什么。换句话说,我如何告诉Workers,生产者已经完成了对队列的添加,从这一点开始,每个工人都应该在他看到队列为空时停止。

My issue is, what's the best way to have a Worker know when there won't be any more work to be done. In other words, how do I tell the Workers that the producer has finished adding to the queue, and from this point on, each worker should stop when he sees that the Queue is empty.

我现在所拥有的是一个设置,其中我的Producer初始化了一个回调,当他完成它的工作(向队列中添加东西)时会触发回调。我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表。当Producer Callback告诉我生产者已完成时,我可以告诉每个工人。此时,他们应该继续检查队列是否为空,当它变为空时,它们应该停止,从而允许我优雅地关闭ExecutorService线程池。它是这样的

What I've got now is a setup where my Producer is initialized with a callback which is triggered when he finishes it's job (of adding stuff to the queue). I also keep a list of all the ConsumerWorkers I've created and submitted to the ThreadPool. When the Producer Callback tells me that the producer is done, I can tell this to each of the workers. At this point they should simply keep checking if the queue is not empty, and when it becomes empty they should stop, thus allowing me to gracefully shutDown the ExecutorService thread pool. It's something like this

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时生产者将完成,发出信号,消费者工作者将在消耗队列中的所有内容之前停止。

The problem here is that I have an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue.

我的问题什么是同步这个的最佳方式,以便一切正常?我应该同步整个部分来检查生产者是否正在运行加上如果队列是空的加上从队列中取出一些块(在队列对象上)?我应该只在ConsumerWorker实例上同步 isRunning 布尔的更新吗?还有其他任何建议吗?

My question is what's the best way to synchronize this so that it all works ok? Should I synchronize the whole part where it checks if the producer is running plus if the queue is empty plus take something from the queue in one block (on the queue object)? Should I just synchronize the update of the isRunning boolean on the ConsumerWorker instance? Any other suggestion?

更新,这是我最终使用的工作实施:

UPDATE, HERE'S THE WORKING IMPLEMENTATION THAT I'VE ENDED UP USING:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这很大程度上受到了JohnVint在下面的回答的启发,只做了一些小修改。

This was inspired heavily by JohnVint's answer below, with only some minor modifications.

===由于@ vendhan的评论而更新。

=== Update due to @vendhan's comment.

感谢您的观察。你是对的,这个问题中的第一段代码(除其他问题外)是而(isRunning ||!inputQueue.isEmpty())没有真的很有道理。

Thank you for your obeservation. You are right, the first snippet of code in this question has (amongst other issues) the one where the while(isRunning || !inputQueue.isEmpty()) doesn't really make sense.

在我的实际最终实现中,我做了一些更接近你更换||的建议。 (或)与&& (和),从某种意义上说,每个工人(消费者)现在只检查他从列表中获得的元素是否是毒丸,如果是这样,那么(理论上我们可以说工人必须运行并且队列必须不是空的。)

In my actual final implementation of this, I do something which is closer to your suggestion of replacing "||" (or) with "&&" (and), in the sense that each worker (consumer) now only checks if the element he's got from the list is a poison pill, and if so stops (so theoretically we can say that the worker has to be running AND the queue must not be empty).

推荐答案

你应该继续 take()从队列中。你可以使用毒丸告诉工人停止。例如:

You should continue to take() from the queue. You can use a poison pill to tell the worker to stop. For example:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

这篇关于阻塞队列和多线程消费者,如何知道何时停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆