为什么并行流不使用ForkJoinPool的所有线程? [英] Why does the parallel stream not use all the threads of the ForkJoinPool?

查看:985
本文介绍了为什么并行流不使用ForkJoinPool的所有线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我知道,如果你使用 parallelStream 没有自定义ForkJoinPool它会使用默认ForkJoinPool默认情况下少一个线程,因为你有处理器。



因此,作为在这里说明(以及在另一个答案问题),为了有更多的并行性,您必须:


将并行流执行提交到您自己的ForkJoinPool:yourFJP.submit ) - > stream.parallel()。forEach(doSomething));



$ b

  import java.util.Set; 
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

public static void main(String [] args)throws InterruptedException,ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool );

IntStream stream = IntStream.range(0,999999);

final Set< String> thNames = Collections.synchronizedSet(new HashSet< String>());

forkJoinPool.submit(> {
stream.parallel()。forEach(n - > {

System.out.println处理n:+ n);
try {
Thread.sleep(500);
thNames.add(Thread.currentThread()。getName());
系统。 out.println(Size:+ thNames.size()+activeCount:+ forkJoinPool.getActiveThreadCount());
} catch(Exception e){
throw new RuntimeException(e);
}
});
})get();
}
}

我做了一组线程名称创建了多少线程,并且还记录了池所拥有的活动线程的数量,并且这两个数字不长于16个,因此这意味着这里的并行性不超过16(为什么甚至16?) 。如果我不使用forkJoinPool,我得到4作为并行性,这是根据我有的处理器数量。



为什么它给我16而不是1000?

解决方案

更新



答案是一个详细的解释,声称 ForkJoinPool 应用背压,甚至不达到规定的并行性级别,因为总是有空闲的工作者可用于处理流。



这是不正确的。实际的答案在原始问题中提供这被标记为重复 - 使用自定义 ForkJoinPool 进行流处理不是官方支持,当使用 forEach 默认池并行性用于确定流分割器行为。



下面是一个示例,说明如何手动将任务提交到自定义 ForkJoinPool ,则池的活动线程计数容易达到其并行性级别:

  for(int i = 0; i < 1_000_000; ++ i){
forkJoinPool.submit(() - > {
try {
Thread.sleep(1);
thNames.add(Thread.currentThread ).getName());
System.out.println(Size:+ thNames.size()+activeCount:+ forkJoinPool.getActiveThreadCount()+parallelism:+ forkJoinPool.getParallelism ;
} catch(Exception e){
throw new RuntimeException(e);
}
});
}

感谢Stuart Marks 到 Sotirios Delimanolis ,表示我原来的答案是错误的:)


So I know that if you use the parallelStream without a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.

So, as stated here (and also in the other answer of that question) in order to have more parallelism, you have to:

submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

So, I did this:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.

Why does it give me 16 and not 1000?

解决方案

Update

Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

That's incorrect.

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)

这篇关于为什么并行流不使用ForkJoinPool的所有线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆