如何在同一个Java流中正确提交和获取多个Futures? [英] How to properly submit and get several Futures in the same Java stream?

查看:101
本文介绍了如何在同一个Java流中正确提交和获取多个Futures?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在同一个流中提交并获得10 Future 。每一个都需要1秒钟来处理,我想并行运行它们。

I try to submit and get 10 Futures in the same stream. Each one takes 1 second to process and I would like to run them in parallel.

我的第一次尝试是 takes_10_sec()顺序运行并需要10秒。

My first try is takes_10_sec() which runs sequentially and takes 10s.

我的第二次尝试是 take_1_sec()并行运行1秒。但是它使用了一个中间 .collect(Collectors.toList())。stream()我觉得这不是一个好方法。

My second try is takes_1_sec() which runs in parallel and takes 1s. However it uses an intermediate .collect(Collectors.toList()).stream() which I don't think is a good way to do it.

还有其他推荐的方法吗?

Is there another recommended way?

public class FutureStream {
    private ExecutorService executor = Executors.newFixedThreadPool(10);;

    @Test
    public void takes_10_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    @Test
    public void takes_1_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .collect(Collectors.toList())
                .stream()
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    private Future<String> longTask() {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return Thread.currentThread().getName();
        });
    }
}


推荐答案

Streams是懒惰的,只会根据终端操作的需要处理元素。对于每个元素,在开始下一个元素之前处理整个管道(并行流除外)。例如,这种技术允许短路操作。

Streams are lazy and will only process elements as needed by the terminal operation. For each element, the whole pipeline is processed before starting with the next element (except for parallel streams). This technique allows short-circuiting operations for example.

因为你有一个阻止的中间 map()操作处理创建的未来的结果,处理它将等待每个未来在创建下一个未来之前完成。

Since you have an intermediate map() operation that blocks on the result of the created future, processing it will wait for each future to complete before creating the next one.

收集所有未来确保所有未来都创建第一。这是适当的解决方案,因为您需要确保在处理结果之前处理整个流。

Collecting them all as you do makes sure that all futures are created first. This is the appropriate solution as you need to make sure that the whole stream is processed before handling the results.

这篇关于如何在同一个Java流中正确提交和获取多个Futures?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆