如何让ThreadPoolExecutor在排队之前将线程增加到最大值? [英] How to get the ThreadPoolExecutor to increase threads to max before queueing?

查看:129
本文介绍了如何让ThreadPoolExecutor在排队之前将线程增加到最大值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一段时间以来,我一直对 ThreadPoolExecutor 的默认行为感到沮丧,它支持 ExecutorService 线程池我们这么多人都用过。引用Javadocs:


如果有多个corePoolSize但运行的是最多的maximumPoolSize线程,则会创建一个新线程仅当队列已满


这意味着如果使用以下代码定义线程池,它永远不会启动第二个线程,因为 LinkedBlockingQueue 是无限制的。

  ExecutorService threadPool = 
new ThreadPoolExecutor(1 / * core * /,50 / * max * /,60 / * timeout * /,
TimeUnit.SECONDS,new LinkedBlockingQueue< Runnable> ;(/ *无限队列* /));

仅当您拥有有界队列且队列为时完整是核心号码以上的任何线程。我怀疑大量初级Java多线程程序员不知道 ThreadPoolExecutor 的这种行为。



现在我有了这不是最佳的特定用例。我正在寻找方法,而不是编写我自己的TPE课程来解决它。



我的要求是针对可能回拨的网络服务不可靠的第三方。




  • 我不想与网络请求同步回拨,所以我想使用线程池。

  • 我通常会得到一些这样的分钟,所以我不想要一个 newFixedThreadPool(...)有大量的线程大部分处于休眠状态。

  • 每隔一段时间我都会遇到这种流量的爆发,我想把线程数量扩大到某个最大值(让我们比如50)。

  • 我需要做一个最好的尝试做所有的回调,所以我想排队50以上的其他任何回调。我不想要使用 newCachedThreadPool()来压倒我的其余网络服务器。



如何解决队列需要的 ThreadPoolExecutor 中的此限制o 之前有限且完整将启动更多线程吗?如何在排队任务之前启动更多线程



编辑:



@Flavio提出了使用 ThreadPoolExecutor.allowCoreThreadTimeOut(true)让核心线程超时并退出的好处。我考虑过这一点,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数降到核心大小以下。

解决方案


如何在 ThreadPoolExecutor 中解决此限制,其中队列需要在更多线程启动之前需要绑定并填满。


我相信我终于通过 ThreadPoolExecutor 找到了一个有点优雅(可能有点hacky)的限制解决方案。它涉及扩展 LinkedBlockingQueue ,让它为 queue.offer(...)返回 false 当已经有一些任务排队时。如果当前线程没有跟上排队任务,TPE将添加其他线程。如果池已经处于最大线程,则将调用 RejectedExecutionHandler 。然后是处理程序然后将 put(...)放入队列。



这当然很奇怪编写一个队列,其中 offer(...)可以返回 false put() 永远不会阻止,这就是黑客部分。但这适用于TPE对队列的使用,所以我没有看到这样做的任何问题。



这是代码:

  //扩展LinkedBlockingQueue以强制offer()有条件地返回false 
BlockingQueue< Runnable> queue = new LinkedBlockingQueue< Runnable>(){
private static final long serialVersionUID = -6903933921423432194L;
@Override
公布布尔报价(Runnable e){
/ *
*如果有0个项目已排队,则将其提供给队列,否则
*返回false所以TPE会添加另一个线程。如果我们返回false
*并且已达到最大线程,则将调用RejectedExecutionHandler
*,这将执行put到队列中。
* /
if(size()== 0){
return super.offer(e);
} else {
返回false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 / * core * /,50 / * max * /,
60 / * secs * /,TimeUnit.SECONDS,queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor executor){
try {
/ *
*这实际放入队列。一旦达到最大线程
*,任务将排队。
* /
executor.getQueue()。put(r) ;
} catch(InterruptedException e){
Thread.currentThread()。interrupt();
return;
}
}
});

使用此机制,当我向队列提交任务时, ThreadPoolExecutor 将:


  1. 最初将线程数量扩展到核心大小(此处为1)。

  2. 将其提供给队列。如果队列为空,它将排队等待现有线程处理。

  3. 如果队列中已有1个或更多元素,则提供(... )将返回false。

  4. 如果返回false,则扩展池中的线程数,直到它们达到最大数量(此处为50)。

  5. 如果在最大值,则调用 RejectedExecutionHandler

  6. RejectedExecutionHandler 然后将任务放入队列中,由FIFO顺序中的第一个可用线程处理。

虽然在我上面的示例代码中,队列是无限制的,您也可以将其定义为有界队列。例如,如果您将容量1000添加到 LinkedBlockingQueue ,那么它将:


  1. 将线程扩展到最大

  2. 然后排队直到它已满1000个任务

  3. 然后阻止调用者,直到空间可用到如果您确实需要使用 offer(...) RejectedExecutionHandler 中的$ c>然后您可以使用商品(E,long,TimeUnit)方法而是以 Long.MAX_VALUE 作为超时。



    修改:



    我根据@Ralf的反馈调整了我的 offer(...)方法覆盖。如果他们没有跟上负载,这只会扩大池中的线程数。



    编辑:



    对此答案的另一个调整可能是实际询问TPE是否存在空闲线程,并且仅在存在项目时将其排队。您必须为此创建一个真正的类,并在其上添加 ourQueue.setThreadPoolExecutor(tpe); 方法。



    然后您的 offer(...)方法可能如下所示:


    1. 检查 tpe.getPoolSize()== tpe.getMaximumPoolSize(),在这种情况下只需调用 super.offer(...)

    2. 如果 tpe.getPoolSize()> tpe.getActiveCount()然后调用 super.offer(...),因为似乎有空闲线程。

    3. 否则返回 false 来分叉另一个线程。

    也许这个:

      int poolSize = tpe.getPoolSize(); 
    int maximumPoolSize = tpe.getMaximumPoolSize();
    if(poolSize> = maximumPoolSize || poolSize> tpe.getActiveCount()){
    return super.offer(e);
    } else {
    返回false;
    }

    请注意,TPE上的get方法很昂贵,因为它们访问 volatile 字段或(在 getActiveCount()的情况下)锁定TPE并遍历线程列表。此外,这里存在竞争条件,可能导致任务被错误地排队,或者在有空闲线程时分叉另一个线程。


    I've been frustrated for some time with the default behavior of ThreadPoolExecutor which backs the ExecutorService thread-pools that so many of us use. To quote from the Javadocs:

    If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

    What this means is that if you define a thread pool with the following code, it will never start the 2nd thread because the LinkedBlockingQueue is unbounded.

    ExecutorService threadPool =
       new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
          TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
    

    Only if you have a bounded queue and the queue is full are any threads above the core number started. I suspect a large number of junior Java multithreaded programmers are unaware of this behavior of the ThreadPoolExecutor.

    Now I have specific use case where this is not-optimal. I'm looking for ways, without writing my own TPE class, to work around it.

    My requirements are for a web service that is making call-backs to a possibly unreliable 3rd party.

    • I don't want to make the call-back synchronously with the web-request, so I want to use a thread-pool.
    • I typically get a couple of these a minute so I don't want to have a newFixedThreadPool(...) with a large number of threads that mostly are dormant.
    • Every so often I get a burst of this traffic and I want to scale up the number of threads to some max value (let's say 50).
    • I need to make a best attempt to do all callbacks so I want to queue up any additional ones above 50. I don't want to overwhelm the rest of my web-server by using a newCachedThreadPool().

    How can I work around this limitation in ThreadPoolExecutor where the queue needs to be bounded and full before more threads will be started? How can I get it to start more threads before queuing tasks?

    Edit:

    @Flavio makes a good point about using the ThreadPoolExecutor.allowCoreThreadTimeOut(true) to have the core threads timeout and exit. I considered that but I still wanted the core-threads feature. I did not want the number of threads in the pool to drop below the core-size if possible.

    解决方案

    How can I work around this limitation in ThreadPoolExecutor where the queue needs to be bounded and full before more threads will be started.

    I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with ThreadPoolExecutor. It involves extending LinkedBlockingQueue to have it return false for queue.offer(...) when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the RejectedExecutionHandler will be called. It is the handler which then does the put(...) into the queue.

    It certainly is strange to write a queue where offer(...) can return false and put() never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.

    Here's the code:

    // extend LinkedBlockingQueue to force offer() to return false conditionally
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
        private static final long serialVersionUID = -6903933921423432194L;
        @Override
        public boolean offer(Runnable e) {
            /*
             * Offer it to the queue if there is 0 items already queued, else
             * return false so the TPE will add another thread. If we return false
             * and max threads have been reached then the RejectedExecutionHandler
             * will be called which will do the put into the queue.
             */
            if (size() == 0) {
                return super.offer(e);
            } else {
                return false;
            }
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
            60 /*secs*/, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                /*
                 * This does the actual put into the queue. Once the max threads
                 * have been reached, the tasks will then queue up.
                 */
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    });
    

    With this mechanism, when I submit tasks to the queue, the ThreadPoolExecutor will:

    1. Scale the number of threads up to the core size initially (here 1).
    2. Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.
    3. If the queue has 1 or more elements already, the offer(...) will return false.
    4. If false is returned, scale up the number of threads in the pool until they reach the max number (here 50).
    5. If at the max then it calls the RejectedExecutionHandler
    6. The RejectedExecutionHandler then puts the task into the queue to be processed by the first available thread in FIFO order.

    Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the LinkedBlockingQueue then it will:

    1. scale the threads up to max
    2. then queue up until it is full with 1000 tasks
    3. then block the caller until space becomes available to the queue.

    In addition, if you really needed to use offer(...) in the RejectedExecutionHandler then you could use the offer(E, long, TimeUnit) method instead with Long.MAX_VALUE as the timeout.

    Edit:

    I've tweaked my offer(...) method override per @Ralf's feedback. This will only scale up the number of threads in the pool if they are not keeping up with the load.

    Edit:

    Another tweak to this answer could be to actually ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add a ourQueue.setThreadPoolExecutor(tpe); method on it.

    Then your offer(...) method might look something like:

    1. Check to see if the tpe.getPoolSize() == tpe.getMaximumPoolSize() in which case just call super.offer(...).
    2. Else if tpe.getPoolSize() > tpe.getActiveCount() then call super.offer(...) since there seem to be idle threads.
    3. Otherwise return false to fork another thread.

    Maybe this:

    int poolSize = tpe.getPoolSize();
    int maximumPoolSize = tpe.getMaximumPoolSize();
    if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
        return super.offer(e);
    } else {
        return false;
    }
    

    Note that the get methods on TPE are expensive since they access volatile fields or (in the case of getActiveCount()) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.

    这篇关于如何让ThreadPoolExecutor在排队之前将线程增加到最大值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆