如何管理ThreadPoolExecutor中的池线程终止? [英] How to manage pool thread termination in ThreadPoolExecutor?

查看:180
本文介绍了如何管理ThreadPoolExecutor中的池线程终止?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


要为池线程设置UncaughtExceptionHandler,请向ThreadPoolExecutor构造函数提供
ThreadFactory 。 (与所有
线程操作一样,只有线程的所有者应该更改其
UncaughtExceptionHandler。)标准线程池允许uncaught
任务异常终止池线程,但使用try-finally
块在发生这种情况时被通知,所以线程可以被替换。

没有未捕获的异常处理程序或其他失败通知
机制,任务可能会默默地失败,这可能非常
混乱。如果您希望在由于
异常导致任务失败时收到通知,以便您可以执行某些特定于任务的恢复操作,则
将包含Runnable或Callable的任务包含
例外或覆盖THReadPoolExecutor中的afterExecute钩子。


书不提供任何示例如何实现这一点。



您可以显示这种技术吗?



PS



我试图写代码示例:

  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,1,TimeUnit.MINUTES,new LinkedBlockingQueue& ,新的ThreadFactory(){
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t,Throwable e){
e.printStackTrace();
}
});
System.out.println(创建的线程与id+ Thread.currentThread()。getId());
返回线程;
}
});
Runnable runnable = new Runnable(){
@Override
public void run(){
System.out.println(Thread.currentThread()。getId()+ );
try {
Thread.sleep(1000);
} catch(InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread()。getId()+termination);
throw new RuntimeException();

}
};
threadPoolExecutor.submit(runnable);
threadPoolExecutor.submit(runnable);
threadPoolExecutor.submit(runnable);
threadPoolExecutor.submit(runnable);
}

但此输出

 创建的线程与ID 1 
创建的线程与ID 1

总是。

解决方案

你的代码有几个错误。首先,您必须将传递给您的 ThreadFactory Runnable 传递给已创建的线程,否则您将断开线程执行任务。其次,您正在工厂中打印 Thread.currentThread()的I​​D,这显然不是新创建的线程。这就是为什么它打印一个ID $ 1 两次的原因。



但是,修复这些错误后,你不会看到未知的例外。原因是当线程池执行器的行为如所述时,提交方法将 Runnable 包装到一个 FutureTask 可以自行捕获所有异常,以便在调用 get()时可以报告。



在这一点上,我们必须记住,有一个任意的$ code> Runnable s的队列,不必是 FutureTask 实例。因此,未知的例外仍然是可能的。当我们直接通过执行排队runnables。



所以固定的例子:

  public class Test {
public static void main(String [] args)throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 1,
TimeUnit.MINUTES,新的LinkedBlockingQueue(),r - > {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler((t,e) > {
synchronized(System.out){
System.out.println(在t.getId()中未捕获异常);
e.printStackTrace(System.out);
}
});
System.out.println(创建的线程与id+ thread.getId());
返回线程;
});
Runnable runnable =() - > {
System.out.println(Thread.currentThread()。getId()+started);
try {
Thread.sleep(1000);
} catch(InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread()。getId()+termination);
throw new RuntimeException();
};
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
threadPoolExecutor.shutdown();
}
}

将打印

 创建的线程与ID 11 
创建的线程与ID 12
11开始
12开始
11终止
12终止
创建的线程与ID 14
创建的线程与ID 15
未收到的异常在11
java.lang.RuntimeException
在smp.Test.lambda $ main $ 2(Test.java:28)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker .run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
15开始
14开始
12个$ b中未捕获的异常$ b java.lang.RuntimeException
at smp.Test.lambda $ main $ 2(Test.java:28)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
14终止
15终止
未收到的异常14
java.lang.RuntimeException
at smp .Test.lambda $ main $ 2(Test.java:28)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker。 run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
15
中未捕获的异常java.lang.RuntimeException
在smp .Test.lambda $ main $ 2(Test.java:28)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker。运行(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)

当然,消息的数字和顺序可能不同。



如所引用的,您还可以通过覆盖 afterExecute

  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,1,
TimeUnit.MINUTES,new LinkedBlockingQueue()){
@Override protected void afterExecute(Runnable r,Throwable t){
if(t!= null)synchronized(System.out){
System.out.println(Uncaught exception in+ Thread.currentThread()的getId());
t.printStackTrace(System.out);
}
}
};

尽管在我的测试中,默认未捕获的异常处理程序也打印了堆栈跟踪...


Quote from concurrency in practice:

To set an UncaughtExceptionHandler for pool threads, provide a ThreadFactory to the ThreadPoolExecutor constructor. (As with all thread manipulation, only the thread's owner should change its UncaughtExceptionHandler.) The standard thread pools allow an uncaught task exception to terminate the pool thread, but use a try-finally block to be notified when this happens so the thread can be replaced. Without an uncaught exception handler or other failure notification mechanism, tasks can appear to fail silently, which can be very confusing. If you want to be notified when a task fails due to an exception so that you can take some task-specific recovery action, either wrap the task with a Runnable or Callable that catches the exception or override the afterExecute hook in THReadPoolExecutor.

Book doesn't provide any examples how to achieve this.

Can you show this technique ?

P.S.

I tried to write code sample:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread();
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    e.printStackTrace();
                }
            });
            System.out.println("created thread with id " + Thread.currentThread().getId());
            return thread;
        }
    });
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getId() + " started");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + " termination");
            throw new RuntimeException();

        }
    };
    threadPoolExecutor.submit(runnable);
    threadPoolExecutor.submit(runnable);
    threadPoolExecutor.submit(runnable);
    threadPoolExecutor.submit(runnable);
}

but this output

created thread with id 1
created thread with id 1

always.

解决方案

There are several errors in your code. First, you must pass the Runnable passed to your ThreadFactory to the created thread, otherwise, you leave a broken thread not executing any tasks. Second, you are printing the id of Thread.currentThread() in your factory, which is obviously not the freshly created thread. That’s why it prints an id of 1 two times.

Still, after fixing these errors, you won’t see uncaught exceptions. The reason is that while the thread pool executor behaves as stated, the submit methods wrap your Runnable into a FutureTask that catches all exceptions on its own, so that they can be reported when calling get().

At this point, we have to remember that there is a queue of arbitrary Runnables, which don’t have to be FutureTask instances. So uncaught exceptions are still possible, e.g. when we enqueue runnables directly via execute.

So the fixed example:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 1,
                TimeUnit.MINUTES, new LinkedBlockingQueue<>(), r -> {
                    Thread thread = new Thread(r);
                    thread.setUncaughtExceptionHandler((t, e) -> {
                        synchronized(System.out) {
                            System.out.println("Uncaught exception in "+t.getId());
                            e.printStackTrace(System.out);
                        }
                    });
                    System.out.println("created thread with id " + thread.getId());
                    return thread;
        });
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getId() + " started");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + " termination");
            throw new RuntimeException();
            };
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.shutdown();
    }
}

will print

created thread with id 11
created thread with id 12
11 started
12 started
11 termination
12 termination
created thread with id 14
created thread with id 15
Uncaught exception in 11
java.lang.RuntimeException
    at smp.Test.lambda$main$2(Test.java:28)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15 started
14 started
Uncaught exception in 12
java.lang.RuntimeException
    at smp.Test.lambda$main$2(Test.java:28)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14 termination
15 termination
Uncaught exception in 14
java.lang.RuntimeException
    at smp.Test.lambda$main$2(Test.java:28)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Uncaught exception in 15
java.lang.RuntimeException
    at smp.Test.lambda$main$2(Test.java:28)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Of course, the numbers and the order of the messages may differ.

As stated in the cite, you can also learn about exceptions by overriding afterExecute:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 1,
    TimeUnit.MINUTES, new LinkedBlockingQueue<>()) {
        @Override protected void afterExecute(Runnable r, Throwable t) {
            if(t!=null) synchronized(System.out) {
               System.out.println("Uncaught exception in "+Thread.currentThread().getId());
               t.printStackTrace(System.out);
            }
        }
    };

Though, in my tests, the default uncaught exception handler also printed stack traces…

这篇关于如何管理ThreadPoolExecutor中的池线程终止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆