创建动态(增长/缩小)线程池 [英] Creating a dynamic (growing/shrinking) thread pool

查看:152
本文介绍了创建动态(增长/缩小)线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在Java中实现一个线程池(java.util.concurrent),当空闲时,线程数量处于某个最小值,当作业提交到线程中时,增长到上限(但绝不会更远)他们完成执行,并在完成所有工作并且不再提交任何工作时收缩回到下限。

I need to implement a thread pool in Java (java.util.concurrent) whose number of threads is at some minimum value when idle, grows up to an upper bound (but never further) when jobs are submitted into it faster than they finish executing, and shrinks back to the lower bound when all jobs are done and no more jobs are submitted.

你会如何实现类似的工作?我想这将是一个相当常见的使用场景,但显然 java.util.concurrent.Executors 工厂方法只能创建固定大小的池和池,当无限增长时提交了很多工作。 ThreadPoolExecutor 类提供 corePoolSize maximumPoolSize 参数,但是文档似乎意味着同时拥有超过 corePoolSize 线程的唯一方法是使用有界作业队列,在这种情况下,如果你已经达到 maximumPoolSize 主题,你会得到拒绝工作,你必须自己处理?我想出了这个:

How would you implement something like that? I imagine that this would be a fairly common usage scenario, but apparently the java.util.concurrent.Executors factory methods can only create fixed-size pools and pools that grow unboundedly when many jobs are submitted. The ThreadPoolExecutor class provides corePoolSize and maximumPoolSize parameters, but its documentation seems to imply that the only way to ever have more than corePoolSize threads at the same time is to use a bounded job queue, in which case, if you've reached maximumPoolSize threads, you'll get job rejections which you have to deal with yourself? I came up with this:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我忽略了什么吗?有一个更好的方法吗?此外,根据一个人的要求,上述代码至少(我认为)(工作总数) - maxSize 工作完成后,上述代码可能无法完成。因此,如果您希望能够将任意数量的作业提交到池中并立即进行而无需等待其中任何一个完成,我看不出如何在没有专门的作业总结线程的情况下执行此操作保存所有提交的作业所需的无限队列。 AFAICS,如果你为ThreadPoolExecutor本身使用一个无界的队列,它的线程数将永远不会超过corePoolSize。

Am I overlooking something? Is there a better way to do this? Also, depending on one's requirements, it might be problematic that the above code will not finish until at least (I think) (total number of jobs) - maxSize jobs have finished. So if you want to be able to submit an arbitrary number of jobs into the pool and proceed immediately without waiting for any of them to finish, I don't see how you could do that without having a dedicated "job sumitting" thread that manages the required unbounded queue to hold all the submitted jobs. AFAICS, if you're using an unbounded queue for the ThreadPoolExecutor itself, its thread count will never grow beyond corePoolSize.

推荐答案

一个可能对您有帮助的技巧是分配一个 RejectedExecutionHandler ,它使用相同的线程将作业提交到阻塞队列。这将阻止当前线程并删除某种循环的需要。

One trick that might help you is to assign a RejectedExecutionHandler that uses the same thread to submit the job into the blocking queue. That will block the current thread and remove the need for some sort of loop.

请在此处查看我的答案:

See my answer here:


如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

这是从该答案中复制的拒绝处理程序。

Here's the rejection handler copied from that answer.

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

只要你意识到这一点,你就应该能够利用核心/最大线程数在核心线程上方创建任何线程之前,您使用 first 的有界阻塞队列将填满。因此,如果您有10个核心线程并且您希望第11个作业启动第11个线程,那么您将需要一个大小为0的阻塞队列(可能是 SynchronousQueue ) 。我觉得这是一个非常好的 ExecutorService 类的真正限制。

You should then be able to make use of the core/max thread count as long as you realize that the bounded blocking queue that you use first fills up before any threads are created above the core threads. So if you have 10 core threads and you want the 11th job to start the 11th thread you will need to have a blocking queue with a size of 0 unfortunately (maybe a SynchronousQueue). I feel that this is a real limitation in the otherwise great ExecutorService classes.

这篇关于创建动态(增长/缩小)线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆