ParallelStream上的CompletableFuture是否被批处理并且比顺序流运行得慢? [英] CompletableFuture on ParallelStream gets batched and runs slower than sequential stream?

查看:116
本文介绍了ParallelStream上的CompletableFuture是否被批处理并且比顺序流运行得慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

方法1

通常,速度很快,效果很好.

The usual, very fast, and works great.

public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .collect(Collectors.toList()).stream() // collect first, else will be sequential
        .map(CompletableFuture::join)
        .mapToLong(Long::longValue)
        .summaryStatistics();

log.info("cf completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cf completed in :: 1054, summaryStats :: LongSummaryStatistics{count=500, sum=504008, min=1000, average=1008.016000, max=1017} 

我知道,如果我不先收集流,那么由于懒惰的性质,流将逐个弹出CompletableFutures并同步运行.因此,作为实验:

I understand that if I don't collect the stream first, then by nature of laziness, the stream will spring up CompletableFutures one by one, and behave synchronously. So, as an experiment:

方法2

删除中间收集步骤,但也使流平行!:

Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .parallel()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .map(CompletableFuture::join) // direct join
        .mapToLong(Long::longValue).summaryStatistics();

log.info("cfps_directJoin completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cfps_directJoin completed in :: 8098, summaryStats :: LongSummaryStatistics{count=500, sum=505002, min=1000, average=1010.004000, max=1015}

摘要:

  • 方法1 :: 1秒
  • 方法2 :: 8秒

我观察到的模式:

    在并行流方法中,批处理"被称为批处理".一次有60个呼叫,因此有500个循环,500/60〜8个批处理,每个处理1秒,因此总共8个
  1. 所以,当我将循环数减少到300时,有300/60 = 5批,实际上需要5秒才能完成.
  1. the parallelstream approach "batches" 60 calls at onces, so with 500 loops, 500/60 ~ 8 batches, each taking 1 second, thus total 8
  2. SO, when I reduce the loop count to 300, there are 300/60 = 5 batches, and it takes 5 seconds to complete actually.

所以,问题是:

为什么在并行+直接收集方法中有这么一批呼叫?

So, the question is:

Why is there this batching of calls in the parallel + direct collection approach?

为完成操作,这是我的虚拟网络调用方法:

For completion, here's my dummy network call method:

    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        log.info(" {} going to sleep..", i);
        try {
            TimeUnit.MILLISECONDS.sleep(1000); // 1 second
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

推荐答案

这是关于 ForJoinPool 如何在阻塞其内部线程时以及如何产生新线程时如何处理事物的人工产物.虽然,我可能会找到发生这种情况的确切路线,但我不确定是否值得.有两个原因:

This is an artifact of how ForJoinPool handles things when you block its inner threads, and how many new ones it spawns. Though, I could probably find the exact lines where this happens, I am not sure it is worth it. For two reasons:

  • 逻辑可以改变

  • that logic can change

ForkJoinPool 中的代码到目前为止并不简单

the code inside ForkJoinPool is by far not trivial

对于我们俩来说, ForkJoinPool.commonPool().getParallelism()似乎将返回 11 ,所以我得到的结果与您相同.如果您登录 ForkJoinPool.commonPool().getPoolSize()来查找您的代码使用了多少个活动线程,您会看到一段时间后,它只会稳定在 64 .因此,可以同时处理的最大任务是 64 ,与您看到的结果( 8秒)相当.

It seems that for both of us, ForkJoinPool.commonPool().getParallelism() will return 11, so I get the same results as you do. If you log ForkJoinPool.commonPool().getPoolSize() to find out how many active threads is your code using, you will see that after a certain period, it just stabilizes itself at 64. So the max tasks that can be processed at the same time is 64, which is on par with the result that you see (those 8 seconds).

如果我使用 -Djava.util.concurrent.ForkJoinPool.common.parallelism = 50 运行您的代码,它现在将在 2秒中执行,并且池大小增加到 256 .这意味着,内部逻辑可以调整这类事情.

If I run your code with -Djava.util.concurrent.ForkJoinPool.common.parallelism=50, it is now executed in 2 seconds, and the pool size is increased to 256. That means, there is an internal logic that adjusts these kind of things.

这篇关于ParallelStream上的CompletableFuture是否被批处理并且比顺序流运行得慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆