处理 ThreadPoolExecutor 的异常 [英] Handling Exceptions for ThreadPoolExecutor

查看:115
本文介绍了处理 ThreadPoolExecutor 的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码片段,它基本上扫描了需要执行的任务列表,然后将每个任务交给执行程序执行.

I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.

JobExecutor 依次创建另一个执行器(用于执行数据库操作...读取和写入数据到队列)并完成任务.

The JobExecutor in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.

JobExecutor 为提交的任务返回一个 Future.当其中一项任务失败时,我想通过捕获所有异常来优雅地中断所有线程并关闭执行程序.我需要做哪些改变?

JobExecutor returns a Future<Boolean> for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?

public class DataMovingClass {
    private static final AtomicInteger uniqueId = new AtomicInteger(0);

  private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   

  ThreadPoolExecutor threadPoolExecutor  = null ;

   private List<Source> sources = new ArrayList<Source>();

    private static class IDGenerator extends ThreadLocal<Integer> {
        @Override
        public Integer get() {
            return uniqueId.incrementAndGet();
        }
  }

  public void init(){

    // load sources list

  }

  public boolean execute() {

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("DataMigration-" + uniqueNumber.get());
                        return t;
                    }// End method
                }, new ThreadPoolExecutor.CallerRunsPolicy());

     List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

     for (Source source : sources) {
                    result.add(threadPoolExecutor.submit(new JobExecutor(source)));
     }

     for (Future<Boolean> jobDone : result) {
                try {
                    if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                        // in case of successful DbWriterClass, we don't need to change
                        // it.
                        success = false;
                    }
                } catch (Exception ex) {
                    // handle exceptions
                }
            }

  }

  public class JobExecutor implements Callable<Boolean>  {

        private ThreadPoolExecutor threadPoolExecutor ;
        Source jobSource ;
        public SourceJobExecutor(Source source) {
            this.jobSource = source;
            threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("Job Executor-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
        }

        public Boolean call() throws Exception {
            boolean status = true ; 
            System.out.println("Starting Job = " + jobSource.getName());
            try {

                        // do the specified task ; 


            }catch (InterruptedException intrEx) {
                logger.warn("InterruptedException", intrEx);
                status = false ;
            } catch(Exception e) {
                logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                status = false ;
            }
           System.out.println("Ending Job = " + jobSource.getName());
            return status ;
        }
    }
}   

推荐答案

当你向执行器提交任务时,它会返回一个 FutureTask 实例.

When you submit a task to the executor, it returns you a FutureTask instance.

FutureTask.get() 会将任务抛出的任何异常作为 ExecutorException 重新抛出.

因此,当您遍历 List 并对每个调用 get 时,捕获 ExecutorException 并调用有序关闭.

So when you iterate through the List<Future> and call get on each, catch ExecutorException and invoke an orderly shutdown.

这篇关于处理 ThreadPoolExecutor 的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆