如何(全局)替换 Java 并行流的公共线程池后端? [英] How to (globally) replace the common thread pool backend of Java parallel streams?

查看:35
本文介绍了如何(全局)替换 Java 并行流的公共线程池后端?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想全局替换 Java 并行流默认使用的公共线程池,例如,对于

I would like to globally replace the common thread pool used by default by the Java parallel streams, i.e., for example for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});

我知道可以通过将此类指令提交到专用线程池来使用专用 ForkJoinPool(参见 Java 8 并行中的自定义线程池流).这里的问题是

I know that it is possible to use a dedicated ForkJoinPool by submitting such instruction to a dedicated thread pool (see Custom thread pool in Java 8 parallel stream ). The question here is

  • 是否可以用其他一些实现(比如 Executors.newFixedThreadPool(10) ?
  • )来替换常见的 ForkJoinPool
  • 是否可以通过某些全局设置(例如某些 JVM 属性)来实现?

备注:我喜欢替换 F/J 池的原因是,因为它似乎有一个错误,使其无法用于嵌套并行循环.

Remark: The reason why I like to replace the F/J pool is, because it appears to have a bug which makes it unusable for nested parallel loops.

嵌套并行循环性能不佳,可能导致死锁,参见 http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Nested parallel loops have poor performance and may lead to deadlocks, see http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

例如:以下代码导致死锁:

For example: The following code leads to a deadlock:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});

(即使在do work here"中没有任何附加代码,因为并行度设置为 <12).

(even without any additional code at "do work here", given that parallelism is set to < 12).

我的问题是如何更换 FJP.如果您喜欢讨论嵌套并行循环,您可以查看 嵌套 Java 8 并行 forEach 循环执行不佳.这种行为是否符合预期? .

My question is how to replace the FJP. If you like to discuss nested parallel loops, you might check Nested Java 8 parallel forEach loop perform poor. Is this behavior expected? .

推荐答案

我认为这不是流 API 的用途.似乎您(错误地)将它用于简单地执行并行任务(专注于任务,而不是数据),而不是进行并行流处理(专注于流中的数据).您的代码以某种方式违反了流的一些主要原则.(我正在写以某种方式",因为它并不是真正禁止但不鼓励的):避免 状态和副作用.

I think that's not the way the stream API is intended to be used. It seems you're (mis)using it for simply doing parallel task execution (focusing on the task, not the data), instead of doing parallel stream processing (focusing on the data in the stream). Your code somehow violates some of the main principles for streams. (I'm writing 'somehow' as it is not really forbidden but discouraged): Avoid states and side effects.

除此之外(或者可能是因为副作用),您在外循环中使用了大量同步,这是无害的其他一切!

Apart from that (or maybe because of side effects), you're using heavy synchronization within your outer loop, which is everything else but harmless!

虽然文档中没有提到,但并行流使用 内部常见的ForkJoinPool.不管这是否是缺乏文件,我们必须简单地接受这个事实.ForkJoinTask的JavaDoc 声明:

Although not mentioned in the documentation, parallel streams use the common ForkJoinPool internally. No matter whether or not this is a lack of documentation, we must simply accept that fact. The JavaDoc of ForkJoinTask states:

可以定义和使用可能会阻塞的 ForkJoinTasks,但这样做需要三个进一步的考虑:(1) 完成很少的其他任务应该依赖于阻塞外部同步或 I/O 的任务.从未加入的事件样式异步任务(例如,那些子类化 CountedCompleter)通常属于这一类.(2) 为了尽量减少资源影响,任务应该是小的;理想情况下只执行(可能)阻塞操作.(3) 除非使用 ForkJoinPool.ManagedBlocker API,或者已知可能被阻塞的任务数量少于池的 ForkJoinPool.getParallelism 级别,池不能保证有足够的线程可用以确保进度或良好的性能.

It is possible to define and use ForkJoinTasks that may block, but doing do requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool.ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool.getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

同样,您似乎正在使用流代替简单的 for 循环和执行程序服务.

Again, it seems that you're using streams as replacement for a simple for-loop and an executor service.

  • 如果您只想并行执行 n 个任务,请使用 ExecutionService
  • 如果您有一个更复杂的示例,其中任务正在创建子任务,请考虑使用 ForkJoinPool(使用 ForkJoinTasks)代替.(它确保了恒定数量的线程,而不会因为有太多任务等待其他人完成而导致死锁的危险,因为等待任务不会阻塞其正在执行的线程).
  • 如果您想(并行)处理数据,请考虑使用流 API.
  • 您不能安装"自定义公共池.它是在私有静态代码中内部创建的.
  • 但是您可以使用某些系统属性来影响并行性、线程工厂和公共池的异常处理程序(请参阅 ForkJoinPool 的JavaDoc)
  • If you just want to execute n tasks in parallel, use an ExecutionService
  • If you have a more complex example where tasks are creating subtasks, consider using a ForkJoinPool (with ForkJoinTasks) instead. (It ensures a constant number of threads without the danger of a deadlock because of too many tasks waiting for others to complete, as waiting tasks do not block their executing threads).
  • If you want to process data (in parallel), consider using the stream API.
  • You cannot 'install' a custom common pool. It's created internally in private static code.
  • But you can take influence on the parallelism, the thread factory and the exception handler of the common pool using certain system properties (see JavaDoc of ForkJoinPool)

不要混淆ExecutionServiceForkJoinPool.它们(通常)不能相互替代!

Don't mix up ExecutionService and ForkJoinPool. They are (usually) not a replacement for each other!

这篇关于如何(全局)替换 Java 并行流的公共线程池后端?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆