ForkJoinPool的大小动态增加? [英] ForkJoinPool size increasing dynamically?

查看:284
本文介绍了ForkJoinPool的大小动态增加?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

相关: ParallelStream上的CompletableFuture被批处理并且比顺序流运行得慢吗?

我正在研究通过parallelStream和CompletableFutures并行化网络调用的不同方式.因此,我遇到了这种情况,Java的parallelStream使用的ForkJoinPool.commonPool()的大小正在动态增长,从〜#Cores到最大值64.

I'm doing some research on different ways of parallelizing network calls through parallelStream and CompletableFutures. As such, I have come across this situation where the ForkJoinPool.commonPool(), which is used by java's parallelStream, is dynamically growing in size, from ~ #Cores, to Max value of 64.

Java详细信息: $ java -version

Java details: $ java -version

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)

下面显示了这种行为的代码(完整的可执行代码此处)

Code that shows such behavior is below (Full executable code here)


    public static int loops = 100;
    private static long sleepTimeMs = 1000;
    private static ExecutorService customPool = Executors.newFixedThreadPool(loops);




    // this method shows dynamic increase in pool size
    public static void m1() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
                .parallel()
                .map(number -> CompletableFuture.supplyAsync(
                        () -> DummyProcess.slowNetworkCall(number), customPool))
                .map(CompletableFuture::join)
                .mapToLong(Long::longValue)
                .summaryStatistics();

    }

    // this method shows static pool size
    public static void m2() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops)
                .parallel()
                .map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
                .summaryStatistics();
    }


    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        // starts with 11 (#cores in my laptop = 12), goes upto 64
        log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

示例输出:

16:07:17.443 [pool-2-thread-7] INFO  generalworks.parallelism.DummyProcess -  44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO  generalworks.parallelism.DummyProcess -  7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO  generalworks.parallelism.DummyProcess -  6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO  generalworks.parallelism.DummyProcess -  82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO  generalworks.parallelism.DummyProcess -  26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO  generalworks.parallelism.DummyProcess -  96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO  generalworks.parallelism.DummyProcess -  78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO  generalworks.parallelism.DummyProcess -  2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO  generalworks.parallelism.DummyProcess -  36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO  generalworks.parallelism.DummyProcess -  77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO  generalworks.parallelism.DummyProcess -  86 going to sleep. poolsize: 64

我知道commonpool中的线程数,即 parallelism 是基于可用内核的最大数量,因此,由于我的笔记本电脑有12个内核,因此我得到的并行性为11和.但是我不明白为什么它会以一种方法继续攀登,但是在另一种方法中,它的大小保持不变

I understand that the number of Threads in a commonpool, i.e, it parallelism is based upon max number of available cores, so since my laptop has 12 cores, i get a parallelism of 11 to start with. But I do not understand why it keeps climbing in one method, but in the other one, it's size keeps constants

推荐答案

我相信您的答案是

I believe your answer is here (ForkJoinPool implementation):

                        if ((wt = q.owner) != null &&
                            ((ts = wt.getState()) == Thread.State.BLOCKED ||
                             ts == Thread.State.WAITING))
                            ++bc;            // worker is blocking

在一个代码版本中,您在 Thread.sleep 上阻塞,这会将线程置于 TIMED_WAITING 状态,而在另一个版本中,您在上阻塞CompletableFuture.join(),将其置于 WAITING 状态.该实现将这些区别开来,并展现出您所观察到的不同行为.

In one version of your code, you block on Thread.sleep, which puts the thread into the TIMED_WAITING state, while in the other you block on CompletableFuture.join(), which puts it into the WAITING state. The implementation distinguishes between these and exhibits the different behaviors you have observed.

CompletableFuture 内还有特殊情况的代码,可使其与 ForkJoinPool 配合使用,以防止在等待结果时出现饥饿:

There is also special-cased code inside CompletableFuture that makes it cooperate with the ForkJoinPool in order to prevent starvation while waiting for the result:

            if (Thread.currentThread() instanceof ForkJoinWorkerThread)
                ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);

与您首先进行测试的原因有关的结论: Thread.sleep()不能正确模拟长时间的网络调用.如果您进行了实际的阻塞操作或其他阻塞操作,则可以通过扩展池来进行补偿.

A conclusion relevant to the reason why you're testing this in the first place: Thread.sleep() does not properly simulate a long network call. If you did an actual one, or some other blocking operation, it would compensate by extending the pool.

这篇关于ForkJoinPool的大小动态增加?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆