如何执行CompletableFuture数组并组合其结果 [英] How to execute an Array of CompletableFuture and compose their results

查看:626
本文介绍了如何执行CompletableFuture数组并组合其结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究Java 8 CompletableFutures,并且阅读(看到)我应该使用thenCompose而不是thenApply.

I am investigating Java 8 CompletableFutures and read (and seen) that I should employ thenCompose instead of thenApply.

我已经将代码转换为使用thenCompose,但是我有一种错误的感觉.

I have converted my code to use thenCompose but I have a feeling in an incorrect manner.

这是我的控制代码...

Here is my controlling code...

final CompletableFuture<List<String>> extractor = get(htmlPageSource);
@SuppressWarnings("unchecked")
final CompletableFuture<List<Documentable>>[] completableFutures =
   new CompletableFuture[ENDPOINT.EXTRACTABLES.size()];
int index = 0;
for( ENDPOINT endpoint : ENDPOINT.EXTRACTABLES ) {
   final CompletableFuture<List<Documentable>> metaData =
      extractor.thenComposeAsync(
         s -> endpoint.contactEndpoit(s), executorService );
   completableFutures[index++] = metaData.exceptionally(x -> failedList(x));
}
CompletableFuture
   .allOf( completableFutures )
   .thenComposeAsync( dummy -> combineDocuments( completableFutures ))
   .thenAccept   ( x -> finish( x ))
   .exceptionally( x -> failed( x ));

private List<Documentable> failedList(final Throwable x) {
   LOGGER.error("failedList", x);
   final List<Documentable> metaData = new ArrayList<>();
   return metaData;
}

private Void failed(final Throwable x) {
   LOGGER.error("failed", x);
   return null;
}

我认为可以接受

但是让我感到不安的代码是:-

However the code that makes me uneasy is this:-

WWW_SITE_ONE("https://example.site.one/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit( final List<String> elements) {
      LOGGER.info("WWW_SITE_ONE " + Thread.currentThread().getName());
      final List<T> SITE_ONEs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String json = Jsoup.connect(ENDPOINT.WWW_SITE_ONE.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
            if (json.contains("errors")) {
               continue;
            }
            final T SITE_ONE = OBJECT_READER_SITE_ONE.readValue(json);
            SITE_ONEs.add(SITE_ONE);
         }
         catch( final Throwable e ) {
            LOGGER.error("WWW_SITE_ONE failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_ONEs);  
   }
},
WWW_SITE_TWO("https://example.site.two/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
      LOGGER.info("WWW_SITE_TWO " + Thread.currentThread().getName());      
      final List<T> SITE_TWOs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String json = Jsoup.connect(ENDPOINT.WWW_SITE_TWO.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
            if (json.equals("Resource not found.")) {
               continue;
            }
            final T SITE_TWO = OBJECT_READER_SITE_TWO.readValue(json);
            SITE_TWOs.add(SITE_TWO);
         }
         catch (final Throwable e) {
            LOGGER.error("WWW_SITE_TWO failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_TWOs);  
   }
},
WWW_SITE_THREE("https://example.site.three/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
      LOGGER.info("WWW_SITE_THREE " + Thread.currentThread().getName());        
      final List<T> SITE_THREEs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String SITE_THREEJsonString = Jsoup
               .connect( ENDPOINT.WWW_SITE_THREE.getBaseUrl() + element)
               .ignoreContentType(true)
               .ignoreHttpErrors(true)
               .maxBodySize(0)
               .timeout(60000)
               .execute()
               .body();
            final SITE_THREE SITE_THREE_Json = OBJECT_READER_SITE_THREE.readValue(SITE_THREEJsonString);
            final T SITE_THREE = (T) SITE_THREE_Json;
            if (SITE_THREE_Json.getHitCount() > 0) {
               SITE_THREEs.add(SITE_THREE);
            }
         }
         catch (final Throwable e) {
            LOGGER.error("WWW_SITE_THREE failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_THREEs);  
   }
};

它是我return正在CompletableFuture.supplyAsync(() -> SITE_THREEs);

这是正确的方法吗?

或者这是否启动了另一个异步线程以简单地返回我的List<>?

Or does this start another asynchronous thread to simply return my List<>?

推荐答案

顾名思义,supplyAsync将执行异步操作,执行Supplierget()方法,因此是lambda表达式的主体,无论它多么微不足道.由于supplyAsync的实现无法检查Supplier封装的代码的琐碎程度,因此必须以这种方式工作.

As the name suggests, supplyAsync will perform an asynchronous operation, executing the Supplier’s get() method, hence the body of the lambda expression, in a background thread, regardless of how trivial it is. Since the implementation of supplyAsync has no way to check how trivial the code encapsulated by the Supplier is, it has to work this way.

而不是CompletableFuture.supplyAsync(() -> SITE_THREEs),您应该使用 CompletableFuture.completedFuture(SITE_THREEs) ,它返回一个已经完成了结果的未来,因此不需要其他操作.

Instead of CompletableFuture.supplyAsync(() -> SITE_THREEs), you should use CompletableFuture.completedFuture(SITE_THREEs) which returns a future that has already been completed with the result, hence, not requiring additional actions.

如果该方法仅返回完成的阶段或引发异常,则也可以将其更改为返回结果值而不是CompletionStage并使用thenApply而不是thenCompose,从而简化了代码-除非您要保留在该方法的未来版本中引入异步操作的选项.

If the method only returns completed stages or throws an exception, you may also change it to return the result value instead of a CompletionStage and use thenApply instead of thenCompose, simplifying your code—unless you want to keep the option of introducing asynchronous operations in a future version of that method.

这篇关于如何执行CompletableFuture数组并组合其结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆