如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待? [英] How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
问题描述
我从队列服务器获取数据,我需要处理它并发送确认。这样的事情:
I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:
while (true) {
queueserver.get.data
ThreadPoolExecutor //send data to thread
queueserver.acknowledgement
我不完全理解线程中会发生什么我认为这个程序获取数据,发送它的线程,然后立即确认它。因此,即使我有一个每个队列的限制只能有200个未确认的项目,它只会拉得尽可能快。当我在单个服务器上编写程序时,这很好,但如果我使用多个工作程序,那么这就成了一个问题,因为线程队列中的项目数量不是它完成的工作的反映,而是它的速度有多快可以从队列服务器获取项目。
I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.
如果线程队列充满工作,我能以某种方式让程序等待吗?
Is there anything I can do to somehow make the program wait if the thread queue is full of work?
推荐答案
如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?
How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
我不是百分百肯定我在这里理解你的问题。当然,不是开放式队列,你可以使用 BlockingQueue
并对其进行限制:
I'm not 100% sure that I'm understanding your problem here. Certainly instead of an open ended queue, you can use a BlockingQueue
with a limit on it:
BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);
就提交给 ExecutorService的工作而言
,而不是使用使用 Executors
创建的默认 ExecutorService
,它使用无界队列,您可以创建自己的:
In terms of jobs submitted to an ExecutorService
, instead of using the default ExecutorService
s created using Executors
, which use an unbounded queue, you can create your own:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(200));
队列填满后,将导致它拒绝任何提交的新任务。您需要设置提交到队列的 RejectedExecutionHandler
。类似于:
Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler
that submits to the queue. Something like:
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
}
});
我认为Java没有 ThreadPoolExecutor是一个主要的错过。 CallerBlocksPolicy
。
这篇关于如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!