CompletableFuture循环:如何收集所有响应并处理错误 [英] CompletableFuture in loop: How to collect all responses and handle errors
问题描述
我试图在循环中为 PUT
请求调用rest api.每个调用都是一个 CompletableFuture
.每次api调用都会返回 RoomTypes.RoomType
I am trying to call a rest api for PUT
request in a loop. Each call is a CompletableFuture
. Each api call returns an object of type RoomTypes.RoomType
-
我想收集响应(成功和错误都回复)在不同的列表中.我该如何实现?我确定我无法使用
allOf
,因为如果有的话,它不会获得所有结果一个呼叫无法更新.
I want to collect the responses (both successful and error responses) in different lists. How do I achieve that? I am sure I cannot use
allOf
because it would not get all the results if any one call fails to update.
如何记录每个呼叫的错误/异常?
How do I log errors/exception for each call?
public void sendRequestsAsync(Map<Integer, List> map1) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry :map1.entrySet()) {
CompletableFuture requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51,33,759,entry.getKey(),
new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
)//Supply the task you wanna run, in your case http request
.thenApply(responses::add);
completableFutures.add(requestCompletableFuture);
}
推荐答案
您可以简单地使用 allOf()
来获得在所有初始期货都完成后(无论是否异常)完成的期货,然后使用
You can simply use allOf()
to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy()
:
List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry : map1.entrySet()) {
CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51, 33, 759, entry.getKey(),
new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
);
completableFutures.add(requestCompletableFuture);
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
// avoid throwing an exception in the join() call
.exceptionally(ex -> null)
.join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
completableFutures.stream()
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));
生成的地图将包含一个带有失败的期货的 true
条目,以及另一个带有 false
密钥的条目.然后,您可以检查2个条目以采取相应的措施.
The resulting map will contain one entry with true
for the failed futures, and another entry with false
key for the succeeded ones. You can then inspect the 2 entries to act accordingly.
请注意,与原始代码相比,有2个细微变化:
Note that there are 2 slight changes compared to your original code:
-
requestCompletableFuture
现在是一个CompletableFuture< RoomTypes.RoomType>
-
thenApply(responses :: add)
和responses
列表已删除
requestCompletableFuture
is now aCompletableFuture<RoomTypes.RoomType>
thenApply(responses::add)
and theresponses
list were removed
关于日志记录/异常处理,只需添加相关的 requestCompletableFuture.handle()
即可分别记录它们,但保留 requestCompletableFuture
而不是由生成的记录handle()
.
Concerning logging/exception handling, just add the relevant requestCompletableFuture.handle()
to log them individually, but keep the requestCompletableFuture
and not the one resulting from handle()
.
这篇关于CompletableFuture循环:如何收集所有响应并处理错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!