列表&lt;未来&gt;到未来<列表>Java中的序列 [英] List&lt;Future&gt; to Future&lt;List&gt; sequence in Java

查看:49
本文介绍了列表&lt;未来&gt;到未来<列表>Java中的序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 List> 转换为 CompletableFuture>.当您有许多异步任务并且需要获取所有任务的结果时,这非常有用.

I am trying to convert List<CompletableFuture<X>> to CompletableFuture<List<T>>. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.

如果其中任何一个失败,那么最终的未来就会失败.这就是我实施的方式:

If any of them fails then the final future fails. This is how I have implemented:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

运行:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

如果其中任何一个失败,那么它就会失败.即使有 100 万个期货,它也能按预期提供输出.我遇到的问题是:假设有超过 5000 个期货,如果其中任何一个失败,我会收到 StackOverflowError:

If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError:

线程pool-1-thread-2611"中的异常java.lang.StackOverflowError在java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)在java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)在java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)在java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

我做错了什么?

注意:当任何未来失败时,上述返回的未来也会失败.接受的答案也应该考虑这一点.

Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.

推荐答案

使用 CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

关于您的实施的一些评论:

A few comments on your implementation:

您对 .thenComposeAsync.thenApplyAsync.thenCombineAsync 的使用可能没有达到您的预期.这些 ...Async 方法在单独的线程中运行提供给它们的函数.因此,在您的情况下,您导致将新项目添加到列表中以在提供的执行程序中运行.无需将轻量级操作填充到缓存的线程执行器中.不要在没有充分理由的情况下使用 thenXXXXAsync 方法.

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

此外,reduce 不应用于累积到可变容器中.即使流是顺序流时它可能会正常工作,但如果流是并行的,它也会失败.要执行可变归约,请改用 .collect.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

如果您想在第一次失败后立即异常地完成整个计算,请在您的sequence方法中执行以下操作:

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

另外,如果您想在第一次失败时取消剩余的操作,请在 result.completeExceptionally(ex); 之后添加 exec.shutdownNow();.当然,这假设 exec 只存在于这一计算中.如果没有,您将不得不循环并单独取消每个剩余的 Future.

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

这篇关于列表&lt;未来&gt;到未来<列表>Java中的序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆