如何实现阻塞ThreadPoolExecutor [英] How to implement blocking ThreadPoolExecutor

查看:160
本文介绍了如何实现阻塞ThreadPoolExecutor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要实施阻止ThreadPoolExecutor.

这是我们企业应用程序中非常关键的要求.

This is a very crucial requirement in our enterprise application.

如果ThreadPoolExecutor.submit()ThreadPoolExecutor.execute()方法阻塞,直到某个线程被释放以接管新任务,它就会执行类似的操作.

It would do something like if ThreadPoolExecutor.submit() or ThreadPoolExecutor.execute() method blocks until a thread gets freed up for picking up a new task.

但是在当前实现中,如果所有池化线程都变得繁忙,则ThreadPoolExecutor.submit()ThreadPoolExecutor.execute()方法将引发RejectedExecutionException异常.

But in current implementation ThreadPoolExecutor.submit() and ThreadPoolExecutor.execute() methods throw RejectedExecutionException exception if all pooled threads get busy.

例如下面的代码抛出RejectedExecutionException:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BlockingTPE {
    public static void main(String[] args) {
        ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor tpExe = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, queue);
        int numJobs = 50;
        for (int i = 1; i <= numJobs; i++) {
            try {
                tpExe.submit(new WorkerThread(i));
                System.out.println("Added#" + (i));
            } catch (RejectedExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

class WorkerThread implements Runnable {
    int jobId;

    public WorkerThread(int jobId) {
        this.jobId = jobId;
    }

    public void run() {
        try {
            Thread.sleep(1000);
        }
        catch (Exception excep) {
        }
    }
}

推荐答案

ThreadPoolExecutor的javadoc所述:

As the javadoc of ThreadPoolExecutor states:

使用给定的初始参数创建一个新的ThreadPoolExecutor,并 默认的线程工厂和拒绝的执行处理程序.

Creates a new ThreadPoolExecutor with the given initial parameters and default thread factory and rejected execution handler.

被拒绝的执行程序处理程序是AbortPolicy的实例,如果队列不接受其他任务,则将调用该实例.从javadoc开始的行为:

The rejected executor handler is an instance of AbortPolicy which will be called if the queue does not accept another task. The behavior as of the javadoc:

总是抛出RejectedExecutionException.

Always throws RejectedExecutionException.

因此,阻塞队列对您没有任何影响.我以这种方式更改了您的代码,它的运行没有任何问题:

Hence the blocking queue does not have any effect for you. I changed your code this way and it runs without any issues:

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 3, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(3));
    try {
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        int numJobs = 50;
        for (int i = 1; i <= numJobs; i++) {
            try {
                executor.submit(new WorkerThread(i));
                System.out.println("Added#" + (i));
            } catch (RejectedExecutionException e) {
                e.printStackTrace();
            }
        }
    } finally {
        executor.shutdown();
    }
}

您必须做出的决定是:

  • 使用未绑定队列来支持所有延迟的任务.例如LinkedBlockingQueue.
  • 使用绑定队列,让当前线程执行不适合整个队列的任务.例如,请参阅我在答案中发布的代码.
  • 如果边界队列已满,请
  • 放弃任务.例如,使用ThreadPoolExecutor.DiscardPolicy作为拒绝的执行处理程序.
  • use an unbound queue to support all delayed tasks. For example LinkedBlockingQueue.
  • use a bound queue and let the current thread execute the task which does not fit into the full queue. For example see the code I posted along my answer.
  • discard tasks if the bounded queue is full. For example use ThreadPoolExecutor.DiscardPolicy as rejected execution handler.

这篇关于如何实现阻塞ThreadPoolExecutor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆