List< Future>到未来< List>序列 [英] List<Future> to Future<List> sequence
问题描述
我想将列表< CompletableFuture< X>
转换为 CompletableFuture< List< T>< / code >。这是非常有用的,因为当你有很多异步任务,你需要得到所有的结果。
I am trying to convert List<CompletableFuture<X>>
to CompletableFuture<List<T>>
. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.
如果其中任何一个失败,则最后一个失败。这是我已经实现:
If any of them fails then the final future fails. This is how I have implemented:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
如果其中任何一个失败,则失败。即使有一百万的未来,它仍然能达到预期的产量。我有一个问题:如果有超过5000个期货,如果任何一个失败,我得到一个 StackOverflowError
:
If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError
:
线程中的异常pool-1-thread-2611java.lang.StackOverflowError
at
java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java :210)
at
java.util.concurrent.CompletableFuture $ ThenCompose.run(CompletableFuture.java:1487)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture .java:193)
at
java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
at
java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java:1487)
Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
我做错了什么?
注意:上述返回的未来会在任何未来失败时失败。
Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.
使用 CompletableFuture.allOf(...)
/ code>:
Use CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(toList())
);
}
对您的实现有几点意见:
A few comments on your implementation:
您使用 .thenComposeAsync
, .thenApplyAsync
和 .thenCombineAsync
可能不会做你期望的。这些 ... Async
方法在单独的线程中运行提供给它们的函数。所以,在你的情况下,你导致添加的新项目到列表中运行在提供的执行器。没有必要将轻量级操作填充到缓存线程执行器中。不要使用 thenXXXXAsync
方法没有很好的理由。
Your use of .thenComposeAsync
, .thenApplyAsync
and .thenCombineAsync
is likely not doing what you expect. These ...Async
methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync
methods without a good reason.
此外, reduce
不应用于累积到可变容器中。即使它可能在流是顺序时正常工作,如果流被并行,它将失败。要执行可变减少,请改用 .collect
。
Additionally, reduce
should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect
instead.
如果要完成整个计算,首先失败,在序列
方法中执行以下操作:
If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence
method:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
如果此外,如果要在第一次失败时取消剩余操作,请添加 exec.shutdownNow(); 紧随
result.completeExceptionally(ex);
。这当然假设 exec
只存在这一个计算。如果没有,您必须循环并分别取消剩余的未来
。
If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow();
right after result.completeExceptionally(ex);
. This, of course, assumes that exec
only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future
individually.
这篇关于List< Future>到未来< List>序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!