如何正确使用 Java Executor? [英] How to properly use Java Executor?

查看:41
本文介绍了如何正确使用 Java Executor?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的多线程应用程序中使用了 Java Executors,但我似乎无法弄清楚使用以下每种方式的最佳时间:

1.

ExecutorService executor=Executors.newFixedThreadPool(50);executor.execute(new A_Runner(...一些参数...));executor.shutdown();while (!executor.isTerminated()) { Thread.sleep(100);}

2.

int Page_Count=200;ExecutorService executor=Executors.newFixedThreadPool(50);doneSignal=new CountDownLatch(Page_Count);for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));doneSignal.await();executor.shutdown();while (!executor.isTerminated()) { Thread.sleep(100);}

3.

int Executor_Count=30;ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());列表<未来<字符串>>期货=新的ArrayList(3330);for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));executor.shutdown();而 (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS);}for (Future 未来:期货){字符串 f=future.get();//...}

具体来说,在[2]中如果我跳过doneSignal,那么它会像[1]一样,那么doneSignal有什么用?

此外,在 [3] 中,如果我添加了 doneSignal 会怎样?或者有可能吗?

我想知道的是:这些方法是否可以互换,或者在某些情况下我应该使用上面的特定类型?

解决方案

  1. ExecutorService

    ExecutorService executor=Executors.newFixedThreadPool(50);

    它简单易用.它隐藏了 ThreadPoolExecutor 的底层细节.

    Callable/Runnable任务数量较少且无界队列中的任务堆积不会增加内存时,首选此方法.降低系统的性能.如果您有 CPU/Memory 约束,请使用带有容量约束的 ThreadPoolExecutorRejectedExecutionHandler 处理拒绝任务.

  2. CountDownLatch

    您已经用给定的计数初始化了 CountDownLatch.该计数通过调用 countDown() 方法递减.我假设您稍后在 Runnable 任务中调用 decrement.等待此计数达到零的线程可以调用 await() 方法之一.调用 await() 会阻塞线程,直到计数达到零.这个类允许一个java线程等待其他线程集完成它们的任务.

    用例:

    1. 实现最大并行度:有时我们希望同时启动多个线程以实现最大并行度

    2. 等待 N 个线程完成后再开始执行

    3. 死锁检测.

      看看这个 Lokesh Gupta 撰写的文章了解更多详情.

  3. ThreadPoolExecutor:它提供了更多的控制来微调各种线程池参数.如果您的应用程序受到活动 Runnable/Callable 任务数量的限制,您应该通过设置最大容量来使用有界队列.一旦队列达到最大容量,您就可以定义 RejectionHandler.Java 提供了四种类型的 RejectedExecutionHandler 政策.

    1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序在拒绝时抛出运行时 RejectedExecutionException.

    2. ThreadPoolExecutor.CallerRunsPolicy中,调用execute自身的线程运行任务.这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度.

    3. ThreadPoolExecutor.DiscardPolicy中,无法执行的任务被简单地丢弃.

    4. ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器没有关闭,工作队列头部的任务被丢弃,然后重试执行(可能会再次失败,导致这要重复.)

      如果你想模拟 CountDownLatch 行为,你可以使用 invokeAll() 方法.

  4. 您没有引用的另一种机制是 ForkJoinPool

    ForkJoinPool 在 Java 7 中被添加到 Java 中.ForkJoinPool 类似于Java ExecutorService 但有一个区别.ForkJoinPool 使它任务很容易将他们的工作分成更小的任务,然后也提交到 ForkJoinPool.当空闲的工作线程从繁忙的工作线程队列中窃取任务时,任务窃取发生在 ForkJoinPool 中.

    Java 8 在 中引入了另一个 APIExecutorService 创建工作窃取池.您不必创建 RecursiveTaskRecursiveAction 但仍然可以使用 ForkJoinPool.

     public static ExecutorService newWorkStealingPool()

    <块引用>

    使用所有可用的处理器作为其目标并行度级别,创建一个窃取工作的线程池.

    默认情况下,它会以 CPU 内核数为参数.

这四种机制是相辅相成的.根据您想要控制的粒度级别,您必须选择正确的粒度.

I've used Java Executors in my multi-threading apps, but I can't seem to figure out when is the best to use each of the following ways:

1.

ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }

2.

int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }

3.

int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);

for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
    String f=future.get();
    // ...
}

Specifically, in [2] what if I skip the doneSignal, then it'll be like [1], so what's the use for the doneSignal?

Also, in [3], what if I add a doneSignal? Or is it possible?

What I'd like to know is : are these approaches interchangeable, or is there a certain situation that I'm supposed to use a specific type above?

解决方案

  1. ExecutorService

    ExecutorService executor=Executors.newFixedThreadPool(50);

    It is simple and easy to use. It hides low level details of ThreadPoolExecutor.

    Prefer this one when number of Callable/Runnable tasks are small in number and piling of tasks in unbounded queue does not increase memory & degrade the performance of the system. If you have CPU/Memory constraints, use ThreadPoolExecutor with capacity constraints & RejectedExecutionHandler to handle rejection of tasks.

  2. CountDownLatch

    You have initialized CountDownLatch with a given count. This count is decremented by calls to the countDown() method. I am assuming that you are calling decrement in your Runnable task later. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero. This class enables a java thread to wait until other set of threads completes their tasks.

    Use cases:

    1. Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism

    2. Wait N threads to completes before start execution

    3. Deadlock detection.

      Have a look at this article by Lokesh Gupta for more details.

  3. ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active Runnable/Callable tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of RejectedExecutionHandler policies.

    1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.

    2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.

    3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.

    4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

      If you want to simulate CountDownLatch behaviour, you can use invokeAll() method.

  4. One more mechanism you did not quote is ForkJoinPool

    The ForkJoinPool was added to Java in Java 7. The ForkJoinPool is similar to the Java ExecutorService but with one difference. The ForkJoinPool makes it easy for tasks to split their work up into smaller tasks which are then submitted to the ForkJoinPool too. Task stealing happens in ForkJoinPool when free worker threads steal tasks from busy worker thread queue.

    Java 8 has introduced one more API in ExecutorService to create work stealing pool. You don't have to create RecursiveTask and RecursiveAction but still can use ForkJoinPool.

      public static ExecutorService newWorkStealingPool()
    

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

    By default, it will take number of CPU cores as parameter.

All these four mechanism are complimentary to each other. Depending on level of granularity you want to control, you have to chose right ones.

这篇关于如何正确使用 Java Executor?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆