Java 8 CompletableFuture中具有默认值的超时 [英] Timeout with default value in Java 8 CompletableFuture
问题描述
假设我有一些异步计算,例如:
Suppose I have some async computation, such as:
CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));
如果异步供应商根据某些情况超时,是否有一种很好的方法可以为foo提供默认值指定超时?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否存在类似于以下假设代码的标准库功能:
Is there a nice way to provide a default value for foo if the async supplier times out according to some specified timeout? Ideally, such functionality would attempt to cancel the slow-running supplier as well. For example, is there standard library functionality that is similar to the following hypothetical code:
CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));
或者甚至更好:
CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));
我知道 get(超时,单位)
,但我想知道是否有一种更好的标准方式以上述代码中的建议以异步和反应方式应用超时。
I know about get(timeout, unit)
, but am wondering if there's a nicer standard way of applying a timeout in an asynchronous and reactive fashion as suggested in the code above.
编辑:这是一个受到启发的解决方案通过 Java 8:在lambda中强制检查异常处理表达式。为什么是强制性的,而不是可选的?,但不幸的是它阻止了一个线程。如果我们依赖createFoo()异步检查超时并抛出自己的超时异常,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(可以没有快速投掷就很昂贵)
Here's a solution that's inspired by Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?, but unfortunately it blocks a thread. If we rely on createFoo() to asynchronously check for timeout and throw its own timeout exception it would work without blocking a thread, but would place more burden on the creator of the supplier and would still have the cost of creating an exception (which can be expensive without "fast throw")
static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));
推荐答案
CompletableFuture.supplyAsync只是一个帮助方法,可以创建一个CompletableFuture为您服务,并将任务提交给ForkJoin池。
CompletableFuture.supplyAsync is just a helper method that creates a CompletableFuture for you, and submits the task to the ForkJoin Pool.
您可以按照以下要求创建自己的supplyAsync:
You can create your own supplyAsync with your requirements like this:
private static final ScheduledExecutorService schedulerExecutor =
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();
public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {
final CompletableFuture<T> cf = new CompletableFuture<T>();
// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}
}, timeoutValue, timeUnit);
return cf;
}
使用该帮助器创建CompletableFuture就像在CompletableFuture中使用静态方法一样简单:
Creating the CompletableFuture with that helper is as easy as using the static method in CompletableFuture:
CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");
测试它:
a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");
这篇关于Java 8 CompletableFuture中具有默认值的超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!