为什么"parallelStream"比"CompletableFuture"实现要快? [英] Why is `parallelStream` faster than the `CompletableFuture` implementation?

查看:94
本文介绍了为什么"parallelStream"比"CompletableFuture"实现要快?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过某种操作来提高后端REST API的性能,该操作可以按顺序轮询多个不同的外部API,并收集它们的响应并将它们全部拼合为一个响应列表.

I wanted to increase the performance of my backend REST API on a certain operation that polled multiple different external APIs sequentially and collected their responses and flattened them all into a single list of responses.

最近才了解CompletableFuture,因此决定尝试一下,并将该解决方案与只将stream更改为parallelStream的解决方案进行比较.

Having just recently learned about CompletableFutures, I decided to give it a go, and compare that solution with the one that involved simply changing my stream for a parallelStream.

这是用于基准测试的代码:

Here is the code used for the benchmark-test:

    package com.alithya.platon;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;


    public class ConcurrentTest {

        static final List<String> REST_APIS =
                Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
        MyTestUtil myTest = new MyTestUtil();
        long millisBefore; // used to benchmark

        @BeforeEach
        void setUp() {
            millisBefore = System.currentTimeMillis();
        }

        @AfterEach
        void tearDown() {
            System.out.printf("time taken : %.4fs\n",
                    (System.currentTimeMillis() - millisBefore) / 1000d);
        }

        @Test
        void parallelSolution() { // 4s
            var parallel = REST_APIS.parallelStream()
                    .map(api -> myTest.collectOneRestCall())
                    .flatMap(List::stream)
                    .collect(Collectors.toList());

            System.out.println("List of responses: " + parallel.toString());
        }

        @Test
        void futureSolution() throws Exception { // 8s
            var futures = myTest.collectAllResponsesAsync(REST_APIS);

            System.out.println("List of responses: " + futures.get()); // only blocks here
        }

        @Test
        void originalProblem() { // 32s
            var sequential = REST_APIS.stream()
                    .map(api -> myTest.collectOneRestCall())
                    .flatMap(List::stream)
                    .collect(Collectors.toList());

            System.out.println("List of responses: " + sequential.toString());
        }
    }


    class MyTestUtil {

        public static final List<String> RESULTS = Arrays.asList("1", "2", "3", "4");

        List<String> collectOneRestCall() {
            try {
                TimeUnit.SECONDS.sleep(4); // simulating the await of the response
            } catch (Exception io) {
                throw new RuntimeException(io);
            } finally {
                return MyTestUtil.RESULTS; // always return something, for this demonstration
            }
        }

        CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) {

            /* Collecting the list of all the async requests that build a List<String>. */
            List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream()
                    .map(api -> nonBlockingRestCall())
                    .collect(Collectors.toList());

            /* Creating a single Future that contains all the Futures we just created ("flatmap"). */
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures
                    .toArray(new CompletableFuture[restApiUrlList.size()]));

            /* When all the Futures have completed, we join them to create merged List<String>. */
            CompletableFuture<List<String>> allCompletableFutures = allFutures
                    .thenApply(future -> completableFutures.stream()
                            .filter(Objects::nonNull) // we filter out the failed calls
                            .map(CompletableFuture::join)
                            .flatMap(List::stream) // creating a List<String> from List<List<String>>
                            .collect(Collectors.toList())
                    );

            return allCompletableFutures;
        }

        private CompletableFuture<List<String>> nonBlockingRestCall() {
            /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
            return CompletableFuture.supplyAsync(() -> collectOneRestCall())
                    .exceptionally(ex -> {
                        return null; // gets managed in the wrapping Future
                    });
        }

    }

有8个(伪)API的列表.每个响应都需要4秒钟的时间来执行,并返回4个实体的列表(为简单起见,我们为字符串).

There is a list of 8 (fake) APIs. Each response takes 4 seconds to execute and returns a list of 4 entities (Strings, in our case, for the sake of simplicity).

结果:

  1. stream:32秒
  2. parallelStream:4秒
  3. CompletableFuture:8秒
  1. stream : 32 seconds
  2. parallelStream : 4 seconds
  3. CompletableFuture : 8 seconds

我很惊讶,并期望最后两个几乎相同.到底是什么造成了这种差异?据我所知,他们俩都使用ForkJoinPool.commonPool().

I'm quite surprised and expected the last two to be almost identical. What exactly is causing that difference? As far as I know, they are both using the ForkJoinPool.commonPool().

我的天真的解释是,由于parallelStream是阻塞操作,它使用实际的MainThread进行工作负载,因此与CompletableFuture相比,它是异步的,并且具有一个额外的活动线程因此无法使用该MainThread.

My naive interpretation would be that parallelStream, since it is a blocking operation, uses the actual MainThread for its workload and thus has an extra active thread to work with, compared to the CompletableFuture which is asynchronous and thus cannot use that MainThread.

推荐答案

CompletableFuture.supplyAsync()将最终使用ForkJoinPool并以Runtime.getRuntime().availableProcessors() - 1的并列性初始化(

CompletableFuture.supplyAsync() will end up using a ForkJoinPool initialized with parralelism of Runtime.getRuntime().availableProcessors() - 1 (JDK 11 source)

因此,看来您有一台8处理器的计算机.因此,池中有7个线程.

So looks like you have an 8 processor machine. Therefore there are 7 threads in the pool.

有8个API调用,因此一次只能在公共池上运行7个.对于完备的期货测试,将有8个任务在您的主线程阻塞的情况下运行,直到全部完成为止. 7将能够立即执行,这意味着必须等待4秒钟.

There are 8 API calls, so only 7 can run at a time on the common pool. And for the completable futures test, there will be 8 tasks running with your main thread blocking until they all complete. 7 will be able to execute at once meaning one has to wait for 4 seconds.

parallelStream()也使用相同的线程池,但是不同之处在于,第一个任务将在执行流的终端操作的主线程上执行,剩下的7个将分配给公共池.因此,在这种情况下,只有足够的线程来并行运行所有内容.尝试将任务数量增加到9,您将获得8秒的运行时间.

parallelStream() also uses this same thread pool, however the difference is that the first task will be executed on main thread that is executing the stream's terminal operation, leaving 7 to be distributed to the common pool. So there are just enough threads to run everything in parallel in this scenario. Try increasing the number of tasks to 9 and you will get the 8 second run-time for your test.

这篇关于为什么"parallelStream"比"CompletableFuture"实现要快?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆