如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待? [英] How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

查看:119
本文介绍了如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从队列服务器获取数据,我需要处理它并发送确认。这样的事情:

I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

我不完全理解线程中会发生什么我认为这个程序获取数据,发送它的线程,然后立即确认它。因此,即使我有一个每个队列的限制只能有200个未确认的项目,它只会拉得尽可能快。当我在单个服务器上编写程序时,这很好,但如果我使用多个工作程序,那么这就成了一个问题,因为线程队列中的项目数量不是它完成的工作的反映,而是它的速度有多快可以从队列服务器获取项目。

I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.

如果线程队列充满工作,我能以某种方式让程序等待吗?

Is there anything I can do to somehow make the program wait if the thread queue is full of work?

推荐答案


如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

我不是百分百肯定我在这里理解你的问题。当然,不是开放式队列,你可以使用 BlockingQueue 并对其进行限制:

I'm not 100% sure that I'm understanding your problem here. Certainly instead of an open ended queue, you can use a BlockingQueue with a limit on it:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

就提交给 ExecutorService的工作而言 ,而不是使用使用 Executors 创建的默认 ExecutorService ,它使用无界队列,您可以创建自己的:

In terms of jobs submitted to an ExecutorService, instead of using the default ExecutorServices created using Executors, which use an unbounded queue, you can create your own:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

队列填满后,将导致它拒绝任何提交的新任务。您需要设置提交到队列的 RejectedExecutionHandler 。类似于:

Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler that submits to the queue. Something like:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

我认为Java没有 ThreadPoolExecutor是一个主要的错过。 CallerBlocksPolicy

这篇关于如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆