返回包含CompletableFutures列表的CompletableFuture [英] Return a CompletableFuture containing a list of CompletableFutures

查看:154
本文介绍了返回包含CompletableFutures列表的CompletableFuture的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正试图使对多个API的调用更快.

I'm trying to make the calls to multiple APIs faster.

在下面的代码中, getFilteredEvents 是当前同步的版本.我感觉 map(x-> x.getFilteredEvents(eventResearch))操作将等待每个API的响应(它使用 RestTemplate.exchange()内部),然后再构建下一个要返回的 List< Event> .一种解决方案可能是在单独的线程上启动 map 调用,但我想尝试使用 CompletableFuture API.

In the code below, getFilteredEvents is the currently synchronous version. I have the feeling that the map(x -> x.getFilteredEvents(eventResearch)) operation will wait on the response of each API (it uses RestTemplate.exchange() internally) before passing onto the next one to build the List<Event> that I want to return. A solution would probably be to launch the map call on separate threads, but I wanted to try out the CompletableFuture API.

因此, getFilteredEventsFaster 是我努力改善响应时间的结果.

Thus, getFilteredEventsFaster is the result of my efforts to improve the response time.

@Service
public class EventsResearchService {

    @Autowired
    private List<UniformEventsResearchApi> eventsResearchApis;

    // this works, but I'm trying to improve it
    public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
        List<Event> eventsList = eventsResearchApis
                .stream()
                .map(x -> x.getFilteredEvents(eventResearch))
                .flatMap(List::stream)
                .collect(Collectors.toList());

        return extractResponse(eventResearch, eventsList);
    }

    // this doesn't work yet: what is wrong?
    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
        List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
                .parallelStream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getFilteredEvents(eventResearch)))
                .collect(Collectors.toList());

        return CompletableFuture.allOf(futureEventsList.toArray(new CompletableFuture<List<Event>>[0]));
    }
}

我的理解是我想将 CompletableFuture< List< Event>> 发送回我的前端,而不是将 List< CompletableFuture< List< Event>>; ,因此是 CompletableFuture.allOf()调用(如果我理解正确,它类似于 flatmap 操作,从而创建了 CompletableFuture 来自多个 CompleteableFuture s).

My understanding is that I would want to send a CompletableFuture<List<Event>> back to my frontend, rather than the List<CompletableFuture<List<Event>>>, hence the CompletableFuture.allOf() call (which, if I understood properly, resembles a flatmap operation, creating a CompletableFuture from multiple CompleteableFutures).

不幸的是,使用 new CompletableFuture< List< Event>> [0] 时,出现了通用数组创建编译错误.

Unfortunately, as it is, I get a Generic array creation compilation error when using new CompletableFuture<List<Event>>[0].

我在做什么错了?

我觉得使用 join 方法确实可以让我收集所有答案,但是那将是对Service线程的阻塞操作,不是吗?(如果我理解正确的话,这样做会破坏尝试将 CompletableFuture 返回到我的前端的目的.)

I have the feeling that using the join method would indeed allow me to collect all the answers, but that would be a blocking operation on the thread of my Service, wouldn't it? (Which would defeat the purpose of trying to return a CompletableFuture to my frontend, if I understand correctly.)

推荐答案

以下代码段显示了使用 listOfFutures.stream().map(CompletableFuture :: join)来收集 allOF .我已经从此页面中获取了此示例,该示例指出它不会等待每一个未来都要完成.

The following snippet shows the use of listOfFutures.stream().map(CompletableFuture::join) to collect the result of allOF. I have taken this example from this page that states that it wont wait for every Future to finish.

class Test {

    public static void main(String[] args) throws Exception {

        long millisBefore = System.currentTimeMillis();

        List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
        List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
        CompletableFuture<List<String>> futureOfList = CompletableFuture
                .allOf(listOfFutures.toArray(new CompletableFuture[0]))
                .thenApply(v ->  listOfFutures.stream().map(CompletableFuture::join).collect(toList()));

        System.out.println(futureOfList.get()); // blocks here
        System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
    }

    private static CompletableFuture<String> downloadWebPage(String webPageLink) {
        return CompletableFuture.supplyAsync( () ->{
            try { TimeUnit.SECONDS.sleep(4); }
            catch (Exception io){ throw new RuntimeException(io); }
            finally { return "downloaded : "+ webPageLink; }
            });
    }

}

由于效率似乎是这里的问题,因此我添加了一个虚拟基准测试,以证明它不需要32秒即可执行.

Since efficiency seems to be the issue here, I have included a dummy benchmarck to prove it does not take 32 seconds to execute.

输出:

[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s


原始问题海报中的编辑

通过此答案,并通过使用


EDIT from the original Question-Poster

Thanks to this answer, and through using this website (talks about exception handling related to allOf), I came up with this completed version:

    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {

        /* Collecting the list of all the async requests that build a List<Event>. */
        List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
                .map(api -> getFilteredEventsAsync(api, eventResearch))
                .collect(Collectors.toList());

        /* Creating a single Future that contains all the Futures we just created ("flatmap"). */
        CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
                .toArray(new CompletableFuture[eventsResearchApis.size()]));

        /* When all the Futures have completed, we join them to create merged List<Event>. */
        CompletableFuture<List<Event>> allCompletableFutures = allFutures
                .thenApply(future -> completableFutures.stream()
                            .map(CompletableFuture::join)
                            .flatMap(List::stream) // creating a List<Event> from List<List<Event>>
                            .collect(Collectors.toList())
                );

        return allCompletableFutures;
    }

    private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
            EventResearch eventResearch) {
        /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
        return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
                .exceptionally(ex -> {
                    LOGGER.error("Extraction of events from API went wrong: ", ex);
                    return Collections.emptyList(); // gets managed in the wrapping Future
                });
    }

这篇关于返回包含CompletableFutures列表的CompletableFuture的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆