ForkJoinPool的大小动态增加? [英] ForkJoinPool size increasing dynamically?
问题描述
相关: ParallelStream上的CompletableFuture被批处理并且比顺序流运行得慢吗?
我正在研究通过parallelStream和CompletableFutures并行化网络调用的不同方式.因此,我遇到了这种情况,Java的parallelStream使用的ForkJoinPool.commonPool()的大小正在动态增长,从〜#Cores到最大值64.
I'm doing some research on different ways of parallelizing network calls through parallelStream and CompletableFutures. As such, I have come across this situation where the ForkJoinPool.commonPool(), which is used by java's parallelStream, is dynamically growing in size, from ~ #Cores, to Max value of 64.
Java详细信息: $ java -version
Java details:
$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
下面显示了这种行为的代码(完整的可执行代码此处)
Code that shows such behavior is below (Full executable code here)
public static int loops = 100;
private static long sleepTimeMs = 1000;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
// this method shows dynamic increase in pool size
public static void m1() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(
() -> DummyProcess.slowNetworkCall(number), customPool))
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
}
// this method shows static pool size
public static void m2() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops)
.parallel()
.map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
.summaryStatistics();
}
public static Long slowNetworkCall(Long i) {
Instant start = Instant.now();
// starts with 11 (#cores in my laptop = 12), goes upto 64
log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" {} woke up..", i);
return Duration.between(start, Instant.now()).toMillis();
}
示例输出:
16:07:17.443 [pool-2-thread-7] INFO generalworks.parallelism.DummyProcess - 44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO generalworks.parallelism.DummyProcess - 7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO generalworks.parallelism.DummyProcess - 6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO generalworks.parallelism.DummyProcess - 82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO generalworks.parallelism.DummyProcess - 26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO generalworks.parallelism.DummyProcess - 96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO generalworks.parallelism.DummyProcess - 78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO generalworks.parallelism.DummyProcess - 2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO generalworks.parallelism.DummyProcess - 36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO generalworks.parallelism.DummyProcess - 77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO generalworks.parallelism.DummyProcess - 86 going to sleep. poolsize: 64
我知道commonpool中的线程数,即 parallelism
是基于可用内核的最大数量,因此,由于我的笔记本电脑有12个内核,因此我得到的并行性为11和.但是我不明白为什么它会以一种方法继续攀登,但是在另一种方法中,它的大小保持不变
I understand that the number of Threads in a commonpool, i.e, it parallelism
is based upon max number of available cores, so since my laptop has 12 cores, i get a parallelism of 11 to start with. But I do not understand why it keeps climbing in one method, but in the other one, it's size keeps constants
推荐答案
I believe your answer is here (ForkJoinPool
implementation):
if ((wt = q.owner) != null &&
((ts = wt.getState()) == Thread.State.BLOCKED ||
ts == Thread.State.WAITING))
++bc; // worker is blocking
在一个代码版本中,您在 Thread.sleep
上阻塞,这会将线程置于 TIMED_WAITING
状态,而在另一个版本中,您在上阻塞CompletableFuture.join()
,将其置于 WAITING
状态.该实现将这些区别开来,并展现出您所观察到的不同行为.
In one version of your code, you block on Thread.sleep
, which puts the thread into the TIMED_WAITING
state, while in the other you block on CompletableFuture.join()
, which puts it into the WAITING
state. The implementation distinguishes between these and exhibits the different behaviors you have observed.
CompletableFuture
内还有特殊情况的代码,可使其与 ForkJoinPool
配合使用,以防止在等待结果时出现饥饿:
There is also special-cased code inside CompletableFuture
that makes it cooperate with the ForkJoinPool
in order to prevent starvation while waiting for the result:
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
与您首先进行测试的原因有关的结论: Thread.sleep()
不能正确模拟长时间的网络调用.如果您进行了实际的阻塞操作或其他阻塞操作,则可以通过扩展池来进行补偿.
A conclusion relevant to the reason why you're testing this in the first place: Thread.sleep()
does not properly simulate a long network call. If you did an actual one, or some other blocking operation, it would compensate by extending the pool.
这篇关于ForkJoinPool的大小动态增加?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!