如何在ForkJoinPool中阻止队列? [英] How to Block a Queue in ForkJoinPool?

查看:201
本文介绍了如何在ForkJoinPool中阻止队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在其队列已满时阻止ForkJoinPool上的线程。
这可以在标准的ThreadPoolExecutor中完成,例如:

I need to block threads on ForkJoinPool when its queue is full. This can be done in the standard ThreadPoolExecutor, e.g.:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            5000L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

我知道,ForkJoinPool里面有一些Dequeue,但是我没有通过其API访问它。

I know, there is some Dequeue inside ForkJoinPool, but I don't have access to it via its API.

更新:请参阅下面的答案。

推荐答案

经过一番研究后,我很乐意回答这个问题:



原因:
没有这样的选择ForkJoinPool的实现由于以下原因。
j.u.c.的大部分。执行程序假设单个并发队列和许多线程。这会导致队列争用,并在多个线程读取/写入队列时降低性能。因此,这种方法不是很可扩展 - >队列上的高争用可以生成大量的上下文切换和CPU业务。

After some research I am happy to answer the question:

Reason: There is no such option in the ForkJoinPool's implementation due to the following reason. The majority of j.u.c. Executors assume single concurrent queue and many threads. This leads to the queue contention and degrades performance when reading/writing to the queue by multiple threads. Thus, such an approach is not quite scalable --> High contention on the queue can generate a large number of context switches and CPU-business.

实施:
在ForkJoinPool中,每个线程都有一个由数组支持的单独的双端队列( Deque )。为了最大限度地减少争用,工作窃取发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部。尾部包含最大部分的工作。换句话说,通过另一个工作线程从尾部窃取可以最大限度地减少与其他工作人员交互的次数 - >争用更少,整体性能更好。

Implementation: In the ForkJoinPool each thread has a separate double-ended queue (Deque) backed by an array. To minimize contention, Work-stealing happens at the tail of the deque, whereas task-submission happens at the head by current thread (worker). The tail contains the largest portion of work. In other words, stealing from the tail by another worker-thread minimizes the number of times to interact with other workers --> less contention, better overall performance.

可扩展性基准测试显示在让它崩溃 -
Fork加入池的可扩展性

Scalability benchmarks are shown in "Let it crash - Scalability of Fork Join Pool"

解决方法:
有全局提交队列。来自非FJ线程的提交进入提交队列(工作人员承担这些任务)。还有上面提到的工人队列。

Work-around thoughts: There's are global submission queues. Submissions from non-FJ threads enter into submission queues (Workers take these tasks). There are also Worker-queues mentioned above.

队列的最大大小受数量限制:

Maximum size for the queues is limited by the number:

   /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

当队列已满时,抛出未经检查的异常:

When the queue is full an unchecked exception is thrown:

RejectedExecutionException("Queue capacity exceeded")

这在javadocs中有描述。

This is described in javadocs.

(另请参阅ThreadPool的构造函数 UncaughtExceptionHandler

(Also, see ThreadPool's constructor for UncaughtExceptionHandler)

我倾向于声称当前的实现没有这样的机制,这应该由我们在消费API中实现。

I tend to claim that current implementation doesn't have such a mechanism and this should be implemented in the consuming API by us.

例如,可以按如下方式完成:

For example, this could be done as follows:


  1. 实施指数返回 - 通过增加下一次重试的时间间隔来尝试定期重新提交任务的逻辑。
    或..

  2. 编写一个定期检查submissionQueue大小的限制器(参见 ForkJoinPool.getQueuedSubmissionCount())。

  1. Implement exponential back-Off logic that tries periodically resubmitting the tasks by incrementing time interval of the next retry. Or..
  2. Write a throttler that checks periodically the size of submissionQueue (see ForkJoinPool.getQueuedSubmissionCount()).

这里是ForkJoinPool的官方JSR-166E java代码以获取更多信息。

Here's the official JSR-166E java code of ForkJoinPool for more information.

这篇关于如何在ForkJoinPool中阻止队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆