List< Future>到未来< List>序列 [英] List<Future> to Future<List> sequence

查看:150
本文介绍了List< Future>到未来< List>序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将列表< CompletableFuture< X> 转换为 CompletableFuture< List< T>< / code >。这是非常有用的,因为当你有很多异步任务,你需要得到所有的结果。

I am trying to convert List<CompletableFuture<X>> to CompletableFuture<List<T>>. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.

如果其中任何一个失败,则最后一个失败。这是我已经实现:

If any of them fails then the final future fails. This is how I have implemented:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

运行它:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

如果其中任何一个失败,则失败。即使有一百万的未来,它仍然能达到预期的产量。我有一个问题:如果有超过5000个期货,如果任何一个失败,我得到一个 StackOverflowError

If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError:


线程中的异常pool-1-thread-2611java.lang.StackOverflowError
at
java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java :210)
at
java.util.concurrent.CompletableFuture $ ThenCompose.run(CompletableFuture.java:1487)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture .java:193)
at
java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at
java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java:1487)

Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

我做错了什么?

注意:上述返回的未来会在任何未来失败时失败。

Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.

使用 CompletableFuture.allOf(...)

解决方案

/ code>:

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(toList())
            );
}

对您的实现有几点意见:

A few comments on your implementation:

您使用 .thenComposeAsync .thenApplyAsync .thenCombineAsync 可能不会做你期望的。这些 ... Async 方法在单独的线程中运行提供给它们的函数。所以,在你的情况下,你导致添加的新项目到列表中运行在提供的执行器。没有必要将轻量级操作填充到缓存线程执行器中。不要使用 thenXXXXAsync 方法没有很好的理由。

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

此外, reduce 不应用于累积到可变容器中。即使它可能在流是顺序时正常工作,如果流被并行,它将失败。要执行可变减少,请改用 .collect

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

如果要完成整个计算,首先失败,在序列方法中执行以下操作:

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

如果此外,如果要在第一次失败时取消剩余操作,请添加 exec.shutdownNow(); 紧随 result.completeExceptionally(ex); 。这当然假设 exec 只存在这一个计算。如果没有,您必须循环并分别取消剩余的未来

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

这篇关于List&lt; Future&gt;到未来&lt; List&gt;序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆