如何使关闭与此自定义ExecutorService正常工作? [英] How can I make shutdown work properly with this custom ExecutorService?

查看:366
本文介绍了如何使关闭与此自定义ExecutorService正常工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是我的代码我提交一些任务到ExecutorService,然后等待他们完成使用shutdown()和awaitTermination()。但是,如果任何一个任务需要更长的时间超过一定的时间来完成我想要它取消而不影响其他任务。我使用来自在超时后中断任务的ExecutorService的代码修正代码为如下:

  package com.jthink.jaikoz.memory; 

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent。*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;

private boolean isShutdown = false;

private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

//将任务映射到超时任务可以用来中断它
private final ConcurrentMap< Runnable,ScheduledFuture> runningTasks = new ConcurrentHashMap< Runnable,ScheduledFuture>();

public long getTimeout()
{
return timeout;
}

public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}

public TimeoutThreadPoolExecutor(int workerSize,ThreadFactory threadFactory,long timeout,TimeUnit timeoutUnit)
{
super(workerSize,workerSize,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue< Runnable>(),threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,long timeout,TimeUnit timeoutUnit){
super(corePoolSize,maximumPoolSize ,keepAliveTime,unit,workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,ThreadFactory threadFactory,long timeout,TimeUnit timeoutUnit){
super corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,RejectedExecutionHandler handler,long timeout,TimeUnit timeoutUnit){
super corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,long timeout,TimeUnit timeoutUnit){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

@Override
public void shutdown(){
isShutdown = true;
super.shutdown();
}

@Override
public List< Runnable> shutdownNow(){
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}

@Override
protected void beforeExecute(Thread t,Runnable r){
if(timeout> 0){
//任务在时间超时后中断正在运行任务的线程
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t),timeout,timeoutUnit);

//添加映射
runningTasks.put(r,scheduled);
}
}

@Override
protected void afterExecute(Runnable r,Throwable t){

//删除映射和取消超时task
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask!= null){
timeoutTask.cancel(false);
}

if(isShutdown)
{
if(getQueue()。isEmpty())
{
//队列为空所以所有任务完成或当前运行
MainWindow.logger.severe(--- Thread Pool Queue is Empty);
//timeoutExecutor.shutdownNow();
}
}
}

/ **
*中断线程
*
* /
TimeoutTask实现Runnable {
private final Thread thread;

public TimeoutTask(线程线程){
this.thread = thread;
}

@Override
public void run(){
MainWindow.logger.severe(因任务太长而取消任务);
thread.interrupt();
}
}
}

有时间完成,当它们都不工作时

  package com.jthink.jaikoz; 

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/ **
*由08年12月12日创建。
* /
public class TestThreadPool extends TestCase
{
public void testThreadPoolTask​​sComplete()throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10,10 ,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue< Runnable>(),6,TimeUnit.SECONDS);

for(int i = 0; i <10; i ++)
{
executorService.submit(new Callable< Object>()
{
@Override
public Object call()throws Exception
{
Thread.sleep(5000);
System.out.println(Done);
return null;
}

});
}
executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.DAYS);
System.out.println(Program done);
}

public void testThreadPoolTask​​sCancelled()throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue< Runnable> ;(),3,TimeUnit.SECONDS);

for(int i = 0; i <10; i ++)
{
executorService.submit(new Callable< Object>()
{
@Override
public Object call()throws Exception
{
Thread.sleep(5000);
System.out.println(Done);
return null;
}

});
}
executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.DAYS);
System.out.println(Program done);
}
}

并且在我的代码看起来工作:



私人布尔matchToRelease(ListMultimap< MatchKey,MetadataChangedWrapper> matchKeyToSongs)
throws JaikozException
{
if stopTask)
{
MainWindow.logger.warning(在matchToRelease中检测到Analyzer停止);
return false;
}

TimeoutThreadPoolExecutor es = getExecutorService();
List< Future< Boolean>> futures = new ArrayList< Future< Boolean>>(matchKeyToSongs.size());
for(MatchKey matchKey:matchKeyToSongs.keySet())
{
List< MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
futures.add(es.submit(new CorrectFromMusicBrainzWorker(this,stats,matchKey,songs)));
}
es.shutdown();
try
{
es.awaitTermination(matchKeyToSongs.keySet()。size()* es.getTimeout(),es.getTimeoutUnit());
}
catch(InterruptedException ie)
{
MainWindow.logger.warning(this.getClass()+has been interrupted);
return false;
}
return true;
}

但是对于一个客户,即使

  ---线程池队列为空

输出awaitTermination()不返回,只有当用户在两小时后取消任务时才最终返回 - 完整日志提取此处

  14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE:/ Volumes / 2TB外部/新iTunes库/ iTunes媒体/音乐/ XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11 
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE:---线程池队列为空
14/12/2014 22.18.01:com.jthink .jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask​​:警告:取消类com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser任务
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser已被中断

那么怎么可能awaiterTermination )没有返回,即使日志显示队列是空的,因此shutdown()已被调用的执行器本身和嵌入的timeoutExecutor?



我有几个


  1. 首先,为什么关闭awaitTermination()的TimeOutExecutor实际上是必要的,还是返回。在我的子类中,awaitTermination()不被覆盖,所以如果所有的任务都完成了什么事情,如果TiumeOutExecutor(awaitTermination()什么都不知道是否关闭)


  2. 其次,为什么---线程池队列为空有时可以多次输出



解决方案

我在 TimeoutThreadPoolExecutor 中进行了自定义修改,它工作正常。

  public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor 
{
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;

private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap< Runnable,ScheduledFuture> runningTasks = new ConcurrentHashMap< Runnable,ScheduledFuture>();

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,long timeout,TimeUnit timeoutUnit){
super(corePoolSize,maximumPoolSize,keepAliveTime, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,ThreadFactory threadFactory,long timeout,TimeUnit timeoutUnit){
super corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,RejectedExecutionHandler handler,long timeout,TimeUnit timeoutUnit){
super corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,long timeout,TimeUnit timeoutUnit){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

@Override
public void shutdown(){
isShutdown = true;
super.shutdown();
}

@Override
public List< Runnable> shutdownNow(){
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}

@Override
protected void beforeExecute(Thread t,Runnable r){
if(timeout> 0){
final ScheduledFuture<? > scheduled = timeoutExecutor.schedule(new TimeoutTask(t),timeout,timeoutUnit);
runningTasks.put(r,scheduled);
}
}

@Override
protected void afterExecute(Runnable r,Throwable t){
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask!= null){
timeoutTask.cancel(false);
}
if(isShutdown)timeoutExecutor.shutdown();
}

类TimeoutTask实现Runnable {
private final Thread thread;

public TimeoutTask(线程线程){
this.thread = thread;
}

@Override
public void run(){
thread.interrupt();
System.out.println(Cancelled);
}
}
}

案例1:无超时

  final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100,100,0L,TimeUnit.MILLISECONDS ,new LinkedBlockingQueue< Runnable>(),
6,TimeUnit.SECONDS);
executorService.submit(new Callable< Object>()
{
@Override
public Object call()throws Exception
{
Thread.sleep 5000);
System.out.println(Done);
return null;
}

});

executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.DAYS);
System.out.println(Program done);

打印:

 任务完成
程序完成

案例2:

  final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100,100,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue< Runnable>(),
3,TimeUnit.SECONDS);
executorService.submit(new Callable< Object>()
{
@Override
public Object call()throws Exception
{
Thread.sleep 5000);
System.out.println(Task done);
return null;
}

});

executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.DAYS);
System.out.println(Program done);

打印:

 已取消
已执行的程式


I'm my code I submit some tasks to an ExecutorService and then wait for them to complete using shutdown() and awaitTermination(). But if any one tasks takes longer than a certain period to complete I want it cancelled without affecting other tasks. I use code amended code from ExecutorService that interrupts tasks after a timeout as follows:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                //timeoutExecutor.shutdownNow();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

and a testcase for when tasks have time to complete and when they don't both work

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

and in my code appear to work:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

however for one customer even though

---Thread Pool Queue is Empty

is output awaitTermination() doesn't return,only eventually returning when user cancels task two hours later - full log extract here

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

So how can it be that awaiterTermination() is not returning even though the logs show queue is empty and therefore shutdown() has been called on both the Executor itself and the embedded timeoutExecutor ?

I have had a few thoughts about this myself but dont know the answer.

  1. Firstly why it is actually neccessary to shutdown the TimeOutExecutor for awaitTermination() to return anyway. In my subclass awaitTermination() is not overridden so if all tasks have completed what does it matter if the TiumeOutExecutor (that awaitTermination() knows nothing about is shutdown or not)

  2. Secondly why does ---Thread Pool Queue is Empty sometimes get output more than once

解决方案

I made a custom modification in TimeoutThreadPoolExecutor and it's working fine.

public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    private final long timeout;
    private final TimeUnit timeoutUnit;
    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
        if (isShutdown) timeoutExecutor.shutdown();
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
            System.out.println("Cancelled");
        }
    }
}

Case 1 : No timeout

final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
    100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    6, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
    @Override
    public Object call() throws Exception
    {
        Thread.sleep(5000);
        System.out.println("Done");
        return null;
    }

});

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");

It prints :

Task done
Program done

Case 2 : Timeout

final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
    100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
    3, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
    @Override
    public Object call() throws Exception
    {
        Thread.sleep(5000);
        System.out.println("Task done");
        return null;
    }

});

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");

It prints :

Cancelled
Program done

这篇关于如何使关闭与此自定义ExecutorService正常工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆