处理ThreadPoolExecutor的异常 [英] Handling Exceptions for ThreadPoolExecutor

查看:182
本文介绍了处理ThreadPoolExecutor的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码片段,它基本上扫描需要执行的任务列表,然后将每个任务交给执行程序执行。



JobExecutor 依次创建另一个执行器(用于执行数据库东西...读取和写入数据到队列)并完成任务。



JobExecutor 为提交的任务返回 Future< Boolean> 。当任何一个任务失败时,我想优雅地中断所有的线程,并通过捕获所有的异常关闭执行器。我需要做什么变化?

  public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger );

private static final ThreadLocal< Integer> uniqueNumber = new IDGenerator();

ThreadPoolExecutor threadPoolExecutor = null;

私人列表< Source> sources = new ArrayList< Source>();

private static class IDGenerator extends ThreadLocal< Integer> {
@Override
public Integer get(){
return uniqueId.incrementAndGet();
}
}

public void init(){

//加载源列表

}

public boolean execute(){

boolean succcess = true;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10,TimeUnit.SECONDS,new ArrayBlockingQueue< Runnable>(1024),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName(DataMigration-+ uniqueNumber.get());
return t;
}结束方法
},新的ThreadPoolExecutor.CallerRunsPolicy());

List< Future< Boolean>> result = new ArrayList< Future< Boolean>>();

for(源代码:sources){
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
}

for(Future< Boolean> jobDone:result){
try {
if(!jobDone.get(100000,TimeUnit.SECONDS)&& ; success){
//在成功的DbWriterClass的情况下,我们不需要更改
//它。
success = false;
}
} catch(Exception ex){
//处理例外
}
}

}
$ b b public class JobExecutor implements Callable< Boolean> {

private ThreadPoolExecutor threadPoolExecutor;
源jobSource;
public SourceJobExecutor(源代码){
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10,TimeUnit.SECONDS,new ArrayBlockingQueue< Runnable>(1024),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName(Job Executor-+ uniqueNumber.get());
return t;
} //结束方法
},new ThreadPoolExecutor.CallerRunsPolicy());
}

public Boolean call()throws Exception {
boolean status = true;
System.out.println(Starting Job =+ jobSource.getName());
try {

//执行指定的任务;


} catch(InterruptedException intrEx){
logger.warn(InterruptedException,intrEx);
status = false;
} catch(Exception e){
logger.fatal(执行任务时发生异常+ jobSource.getName(),e);
status = false;
}
System.out.println(Ending Job =+ jobSource.getName());
return status;
}
}
}


解决方案>

当您向执行者提交任务时,它会向您返回一个 FutureTask 实例。



FutureTask.get() 会将任务抛出的任何异常重新抛出为 ExecutorException



所以当你遍历 List< Future> get on each,catch ExecutorException 并调用有序关机。


I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.

The JobExecutor in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.

JobExecutor returns a Future<Boolean> for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?

public class DataMovingClass {
    private static final AtomicInteger uniqueId = new AtomicInteger(0);

  private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   

  ThreadPoolExecutor threadPoolExecutor  = null ;

   private List<Source> sources = new ArrayList<Source>();

    private static class IDGenerator extends ThreadLocal<Integer> {
        @Override
        public Integer get() {
            return uniqueId.incrementAndGet();
        }
  }

  public void init(){

    // load sources list

  }

  public boolean execute() {

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("DataMigration-" + uniqueNumber.get());
                        return t;
                    }// End method
                }, new ThreadPoolExecutor.CallerRunsPolicy());

     List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

     for (Source source : sources) {
                    result.add(threadPoolExecutor.submit(new JobExecutor(source)));
     }

     for (Future<Boolean> jobDone : result) {
                try {
                    if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                        // in case of successful DbWriterClass, we don't need to change
                        // it.
                        success = false;
                    }
                } catch (Exception ex) {
                    // handle exceptions
                }
            }

  }

  public class JobExecutor implements Callable<Boolean>  {

        private ThreadPoolExecutor threadPoolExecutor ;
        Source jobSource ;
        public SourceJobExecutor(Source source) {
            this.jobSource = source;
            threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("Job Executor-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
        }

        public Boolean call() throws Exception {
            boolean status = true ; 
            System.out.println("Starting Job = " + jobSource.getName());
            try {

                        // do the specified task ; 


            }catch (InterruptedException intrEx) {
                logger.warn("InterruptedException", intrEx);
                status = false ;
            } catch(Exception e) {
                logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                status = false ;
            }
           System.out.println("Ending Job = " + jobSource.getName());
            return status ;
        }
    }
}   

解决方案

When you submit a task to the executor, it returns you a FutureTask instance.

FutureTask.get() will re-throw any exception thrown by the task as an ExecutorException.

So when you iterate through the List<Future> and call get on each, catch ExecutorException and invoke an orderly shutdown.

这篇关于处理ThreadPoolExecutor的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆