如何正确使用 Java Executor? [英] How to properly use Java Executor?
问题描述
我在我的多线程应用程序中使用了 Java Executors,但我似乎无法弄清楚使用以下每种方式的最佳时间:
1.
ExecutorService executor=Executors.newFixedThreadPool(50);executor.execute(new A_Runner(...一些参数...));executor.shutdown();while (!executor.isTerminated()) { Thread.sleep(100);}
2.
int Page_Count=200;ExecutorService executor=Executors.newFixedThreadPool(50);doneSignal=new CountDownLatch(Page_Count);for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));doneSignal.await();executor.shutdown();while (!executor.isTerminated()) { Thread.sleep(100);}
3.
int Executor_Count=30;ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());列表<未来<字符串>>期货=新的ArrayList(3330);for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));executor.shutdown();而 (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS);}for (Future 未来:期货){字符串 f=future.get();//...}
具体来说,在[2]中如果我跳过doneSignal,那么它会像[1]一样,那么doneSignal有什么用?
此外,在 [3] 中,如果我添加了 doneSignal 会怎样?或者有可能吗?
我想知道的是:这些方法是否可以互换,或者在某些情况下我应该使用上面的特定类型?
ExecutorService executor=Executors.newFixedThreadPool(50);
它简单易用.它隐藏了
ThreadPoolExecutor
的底层细节.当
Callable/Runnable
任务数量较少且无界队列中的任务堆积不会增加内存时,首选此方法.降低系统的性能.如果您有CPU/Memory
约束,请使用带有容量约束的ThreadPoolExecutor
RejectedExecutionHandler
处理拒绝任务.您已经用给定的计数初始化了
CountDownLatch
.该计数通过调用countDown()
方法递减.我假设您稍后在 Runnable 任务中调用 decrement.等待此计数达到零的线程可以调用await()
方法之一.调用await()
会阻塞线程,直到计数达到零.这个类允许一个java线程等待其他线程集完成它们的任务.用例:
实现最大并行度:有时我们希望同时启动多个线程以实现最大并行度
等待 N 个线程完成后再开始执行
死锁检测.
看看这个 Lokesh Gupta 撰写的文章了解更多详情.
ThreadPoolExecutor:它提供了更多的控制来微调各种线程池参数.如果您的应用程序受到活动
Runnable/Callable
任务数量的限制,您应该通过设置最大容量来使用有界队列.一旦队列达到最大容量,您就可以定义 RejectionHandler.Java 提供了四种类型的RejectedExecutionHandler
政策.在默认的
ThreadPoolExecutor.AbortPolicy
中,处理程序在拒绝时抛出运行时 RejectedExecutionException.在
ThreadPoolExecutor.CallerRunsPolicy
中,调用execute自身的线程运行任务.这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度.在
ThreadPoolExecutor.DiscardPolicy
中,无法执行的任务被简单地丢弃.在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行器没有关闭,工作队列头部的任务被丢弃,然后重试执行(可能会再次失败,导致这要重复.)如果你想模拟
CountDownLatch
行为,你可以使用invokeAll()
方法.
您没有引用的另一种机制是 ForkJoinPool
ForkJoinPool
在 Java 7 中被添加到 Java 中.ForkJoinPool
类似于JavaExecutorService
但有一个区别.ForkJoinPool
使它任务很容易将他们的工作分成更小的任务,然后也提交到ForkJoinPool
.当空闲的工作线程从繁忙的工作线程队列中窃取任务时,任务窃取发生在ForkJoinPool
中.Java 8 在 中引入了另一个 APIExecutorService 创建工作窃取池.您不必创建
RecursiveTask
和RecursiveAction
但仍然可以使用ForkJoinPool
.public static ExecutorService newWorkStealingPool()
<块引用>
使用所有可用的处理器作为其目标并行度级别,创建一个窃取工作的线程池.
默认情况下,它会以 CPU 内核数为参数.
这四种机制是相辅相成的.根据您想要控制的粒度级别,您必须选择正确的粒度.
I've used Java Executors in my multi-threading apps, but I can't seem to figure out when is the best to use each of the following ways:
1.
ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
2.
int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
3.
int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);
for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
String f=future.get();
// ...
}
Specifically, in [2] what if I skip the doneSignal, then it'll be like [1], so what's the use for the doneSignal?
Also, in [3], what if I add a doneSignal? Or is it possible?
What I'd like to know is : are these approaches interchangeable, or is there a certain situation that I'm supposed to use a specific type above?
-
ExecutorService executor=Executors.newFixedThreadPool(50);
It is simple and easy to use. It hides low level details of
ThreadPoolExecutor
.Prefer this one when number of
Callable/Runnable
tasks are small in number and piling of tasks in unbounded queue does not increase memory & degrade the performance of the system. If you haveCPU/Memory
constraints, useThreadPoolExecutor
with capacity constraints &RejectedExecutionHandler
to handle rejection of tasks. -
You have initialized
CountDownLatch
with a given count. This count is decremented by calls to thecountDown()
method. I am assuming that you are calling decrement in your Runnable task later. Threads waiting for this count to reach zero can call one of theawait()
methods. Callingawait()
blocks the thread until the count reaches zero. This class enables a java thread to wait until other set of threads completes their tasks.Use cases:
Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism
Wait N threads to completes before start execution
Deadlock detection.
Have a look at this article by Lokesh Gupta for more details.
ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active
Runnable/Callable
tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types ofRejectedExecutionHandler
policies.In the default
ThreadPoolExecutor.AbortPolicy
, the handler throws a runtime RejectedExecutionException upon rejection.In
ThreadPoolExecutor.CallerRunsPolicy
, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.In
ThreadPoolExecutor.DiscardPolicy
, a task that cannot be executed is simply dropped.In
ThreadPoolExecutor.DiscardOldestPolicy
, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)If you want to simulate
CountDownLatch
behaviour, you can useinvokeAll()
method.
One more mechanism you did not quote is ForkJoinPool
The
ForkJoinPool
was added to Java in Java 7. TheForkJoinPool
is similar to the JavaExecutorService
but with one difference. TheForkJoinPool
makes it easy for tasks to split their work up into smaller tasks which are then submitted to theForkJoinPool
too. Task stealing happens inForkJoinPool
when free worker threads steal tasks from busy worker thread queue.Java 8 has introduced one more API in ExecutorService to create work stealing pool. You don't have to create
RecursiveTask
andRecursiveAction
but still can useForkJoinPool
.public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level.
By default, it will take number of CPU cores as parameter.
All these four mechanism are complimentary to each other. Depending on level of granularity you want to control, you have to chose right ones.
这篇关于如何正确使用 Java Executor?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!