使用CompletableFuture重试逻辑 [英] Retry logic with CompletableFuture
问题描述
我需要在正在处理的异步框架中提交任务,但是我需要捕捉异常,并在中止之前多次重试同一任务。
I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".
我正在使用的代码是:
int retries = 0;
public CompletableFuture<Result> executeActionAsync() {
// Execute the action async and get the future
CompletableFuture<Result> f = executeMycustomActionHere();
// If the future completes with exception:
f.exceptionally(ex -> {
retries++; // Increment the retry count
if (retries < MAX_RETRIES)
return executeActionAsync(); // <--- Submit one more time
// Abort with a null value
return null;
});
// Return the future
return f;
}
由于lambda的返回类型错误,当前无法编译:它期望 Result
,但是 executeActionAsync
返回 CompletableFuture< Result>
。
This currently doesn't compile because the return type of the lambda is wrong: it expects a Result
, but the executeActionAsync
returns a CompletableFuture<Result>
.
如何实现这种完全异步的重试逻辑?
How can I implement this fully async retry logic?
推荐答案
我想我成功了。这是我创建的示例类和测试代码:
I think I was successfully. Here's an example class I created and the test code:
public class RetriableTask
{
protected static final int MAX_RETRIES = 10;
protected int retries = 0;
protected int n = 0;
protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();
public RetriableTask(int number) {
n = number;
}
public CompletableFuture<Integer> executeAsync() {
// Create a failure within variable timeout
Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);
// Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts.
// In real application this should be a real future
final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
if (n > 5 && retries > 5)
taskFuture.complete(retries * n);
// Attach the failure future to the task future, and perform a check on completion
taskFuture.applyToEither(timeoutFuture, Function.identity())
.whenCompleteAsync((result, exception) -> {
if (exception == null) {
future.complete(result);
} else {
retries++;
if (retries >= MAX_RETRIES) {
future.completeExceptionally(exception);
} else {
executeAsync();
}
}
});
// Return the future
return future;
}
}
用法
Usage
int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
tasks.add(new RetriableTask(i));
}
System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
futures.add(tasks.get(i).executeAsync());
}
System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
try {
CompletableFuture<Integer> future = futures.get(i);
int result = future.get();
System.out.println(i + " result is " + result);
} catch (Exception ex) {
System.out.println(i + " I got exception!");
}
}
System.out.println("Done waiting...");
输出
Output
generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...
主要思想和一些粘合代码( failAfter
函数)来自此处。
欢迎其他任何建议或改进。
Any other suggestions or improvement are welcome.
这篇关于使用CompletableFuture重试逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!