为什么并行流不使用 ForkJoinPool 的所有线程? [英] Why does the parallel stream not use all the threads of the ForkJoinPool?

查看:14
本文介绍了为什么并行流不使用 ForkJoinPool 的所有线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我知道如果你使用 parallelStream 而不使用自定义 ForkJoinPool,它将使用默认的 ForkJoinPool,默认情况下,它比处理器少一个线程.

So I know that if you use the parallelStream without a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.

因此,正如 此处所述(以及该问题的另一个答案中)为了获得更多并行性,你必须:

So, as stated here (and also in the other answer of that question) in order to have more parallelism, you have to:

将并行流执行提交到您自己的 ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

我创建了一组线程名称以查看正在创建的线程数,并记录了池中的活动线程数并且两个数字都不会增长超过 16,因此这意味着这里的并行度不超过 16(为什么甚至是 16?).如果我不使用 forkJoinPool,我得到 4 作为并行度,这是根据我拥有的处理器数量.

I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.

为什么它给我 16 而不是 1000?

Why does it give me 16 and not 1000?

推荐答案

更新

最初这个答案是一个详尽的解释,声称 ForkJoinPool 应用了背压,甚至没有达到规定的并行度级别,因为总是有空闲的工作人员可以处理流.

Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

这是错误的.

实际答案在被标记为重复的原始问题中提供 - 官方不支持使用自定义 ForkJoinPool 进行流处理,并且在使用 forEach 时,默认池并行度用于确定流拆分器行为.

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

这是一个示例,当任务手动提交到自定义ForkJoinPool时,池的活动线程数很容易达到其并行级别:

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

感谢 Stuart Marks 指出这一点,并感谢 Sotirios Delimanolis 认为我原来的答案是错误的 :)

Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)

这篇关于为什么并行流不使用 ForkJoinPool 的所有线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆