Mono vs CompletableFuture [英] Mono vs CompletableFuture
问题描述
CompletableFuture
在单独的线程(使用线程池)上执行任务,并提供回调函数。假设我在 CompletableFuture
中有一个API调用。这是API调用阻止吗?线程会被阻塞,直到无法从API获得响应为止? (我知道主线程/ tomcat线程将是非阻塞的,但是执行CompletableFuture任务的线程又如何呢?)
CompletableFuture
executes a task on a separate thread ( uses a thread-pool ) and provides a callback function. Let's say I have an API call in a CompletableFuture
. Is that an API call blocking? Would the thread be blocked till it does not get a response from the API? ( I know main thread/tomcat thread will be non-blocking, but what about the thread on which CompletableFuture task is executing? )
Mono完全是非阻塞的,因为据我所知。
Mono is completely non-blocking, as far as I know.
请对此加以说明,如果我错了,请纠正我。
Please shed some light on this and correct me if I am wrong.
推荐答案
CompletableFuture是异步的。
关于CompletableFuture的一个事实是它确实是异步的,它允许您从调用者线程和API异步运行任务例如 thenXXX
允许您在结果可用时进行处理。另一方面, CompletableFuture
并非总是非阻塞的。例如,当您运行以下代码时,它将在默认的 ForkJoinPool
上异步执行:
CompletableFuture is Async. But is it non-blocking?
One which is true about CompletableFuture is that it is truly async, it allows you to run your task asynchronously from the caller thread and the API such as thenXXX
allows you to process the result when it becomes available. On the other hand, CompletableFuture
is not always non-blocking. For example, when you run the following code, it will be executed asynchronously on the default ForkJoinPool
:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显,<< c中的 Thread
code> ForkJoinPool 执行该任务最终将被阻止,这意味着我们不能保证该调用不会被阻止。
It is clear that the Thread
in ForkJoinPool
that executes the task will be blocked eventually which means that we can't guarantee that the call will be non-blocking.
另一方面, CompletableFuture
公开了API,使您可以使其真正成为非阻塞。
On the other hand, CompletableFuture
exposes API which allows you to make it truly non-blocking.
例如,您始终可以执行以下操作:
For example, you can always do the following:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见, CompletableFuture
的API为您提供了 complete
和 completeExceptionally
方法可在需要时完成执行而不会阻塞任何线程。
As you can see, the API of CompletableFuture
future provides you with the complete
and completeExceptionally
methods that complete your execution whenever it is needed without blocking any thread.
在上一节中,我们对CF行为进行了概述,但是CompletableFuture和Mono之间的主要区别是什么?
In the previous section, we got an overview of CF behavior, but what is the central difference between CompletableFuture and Mono?
值得一提的是我们也可以阻止Mono,没有人阻止我们编写以下内容:
It worth to mention that we can do blocking Mono as well. No one prevents us from writing the following:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了Future,则调用者线程将是受阻。但是,我们始终可以通过提供其他 subscribeOn
运算符来解决此问题。不过, Mono
的更广泛的API并不是主要功能。
Of course, once we subscribe to the future, the caller thread will be blocked. But we can always work around that by providing an additional subscribeOn
operator. Nevertheless, the broader API of Mono
is not the key feature.
为了了解 CompletableFuture
和 Mono
,让我们回到前面提到的 myNonBlockingHttpCall
方法实现
In order to understand the main difference between CompletableFuture
and Mono
, lets back to previously mentioned myNonBlockingHttpCall
method implementation.
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
对于 CompletableFuture
,一旦调用了该方法,它将急切地执行对另一个服务/资源的HTTP调用。即使我们在验证某些前后条件后并不需要真正的执行结果,它也会开始执行,并且还会为该工作分配额外的CPU / DB-Connections / What-Ever-Machine-Resources。
In the case of CompletableFuture
, once the method is called, it will eagerly execute HTTP call to another service/resource. Even though we will not really need the result of the execution after verifying some pre/post conditions, it starts the execution, and additional CPU/DB-Connections/What-Ever-Machine-Resources will be allocated for this work.
相反, Mono
类型在定义上是惰性的:
In contrast, the Mono
type is lazy by definition:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,直到最后一个都不会发生mono
已订阅。因此,只有当 myNonBlockingHttpCallWithMono
方法返回的 Mono
被订阅时,提供给 Mono.create(Consumer)
将被执行。
In this case, nothing will happen until the final mono
is subscribed. Thus, only when Mono
returned by the myNonBlockingHttpCallWithMono
method, will be subscribed, the logic provided to Mono.create(Consumer)
will be executed.
我们可以走得更远。我们可以使执行更加懒惰。如您所知, Mono
从Reactive Streams规范扩展了 Publisher
。 Reactive Streams的尖叫功能是背压支持。因此,使用 Mono
API,我们仅在确实需要数据且订户已准备好使用它们时才可以执行:
And we can go even further. We can make our execution much lazier. As you might know, Mono
extends Publisher
from the Reactive Streams specification. The screaming feature of Reactive Streams is backpressure support. Thus, using the Mono
API we can do execution only when the data is really needed, and our subscriber is ready to consume them:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在以下情况下执行数据订户调用 Subscription#request
,这样便声明已准备好接收数据。
In this example, we execute data only when subscriber called Subscription#request
so by doing that it declared its readiness to receive data.
-
CompletableFuture
是异步的并且可以是非阻塞的 -
CompletableFuture
很渴望。您无法推迟执行。但是您可以取消它们(总比没有好) -
Mono
是异步/非阻塞的,可以轻松地在其上执行任何调用通过用不同的运算符组成主Mono
来实现不同的Thread
。 -
Mono
确实很懒,它可以通过订阅者的存在及其准备使用数据来推迟执行启动。
CompletableFuture
is async and can be non-blockingCompletableFuture
is eager. You can't postpone the execution. But you can cancel them (which is better than nothing)Mono
is async/non-blocking and can easily execute any call on differentThread
by composing the mainMono
with different operators.Mono
is truly lazy and allows postponing execution startup by the subscriber presence and its readiness to consume data.
这篇关于Mono vs CompletableFuture的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!