Mono vs CompletableFuture [英] Mono vs CompletableFuture

查看:140
本文介绍了Mono vs CompletableFuture的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

CompletableFuture 在单独的线程(使用线程池)上执行任务,并提供回调函数。假设我在 CompletableFuture 中有一个API调用。这是API调用阻止吗?线程会被阻塞,直到无法从API获得响应为止? (我知道主线程/ tomcat线程将是非阻塞的,但是执行CompletableFuture任务的线程又如何呢?)

CompletableFuture executes a task on a separate thread ( uses a thread-pool ) and provides a callback function. Let's say I have an API call in a CompletableFuture. Is that an API call blocking? Would the thread be blocked till it does not get a response from the API? ( I know main thread/tomcat thread will be non-blocking, but what about the thread on which CompletableFuture task is executing? )

Mono完全是非阻塞的,因为据我所知。

Mono is completely non-blocking, as far as I know.

请对此加以说明,如果我错了,请纠正我。

Please shed some light on this and correct me if I am wrong.

推荐答案

CompletableFuture是异步的。



关于CompletableFuture的一个事实是它确实是异步的,它允许您从调用者线程和API异步运行任务例如 thenXXX 允许您在结果可用时进行处理。另一方面, CompletableFuture 并非总是非阻塞的。例如,当您运行以下代码时,它将在默认的 ForkJoinPool 上异步执行:

CompletableFuture is Async. But is it non-blocking?

One which is true about CompletableFuture is that it is truly async, it allows you to run your task asynchronously from the caller thread and the API such as thenXXX allows you to process the result when it becomes available. On the other hand, CompletableFuture is not always non-blocking. For example, when you run the following code, it will be executed asynchronously on the default ForkJoinPool:

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

很明显,<< c中的 Thread code> ForkJoinPool 执行该任务最终将被阻止,这意味着我们不能保证该调用不会被阻止。

It is clear that the Thread in ForkJoinPool that executes the task will be blocked eventually which means that we can't guarantee that the call will be non-blocking.

另一方面, CompletableFuture 公开了API,使您可以使其真正成为非阻塞。

On the other hand, CompletableFuture exposes API which allows you to make it truly non-blocking.

例如,您始终可以执行以下操作:

For example, you can always do the following:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

如您所见, CompletableFuture 的API为您提供了 complete completeExceptionally 方法可在需要时完成执行而不会阻塞任何线程。

As you can see, the API of CompletableFuture future provides you with the complete and completeExceptionally methods that complete your execution whenever it is needed without blocking any thread.

在上一节中,我们对CF行为进行了概述,但是CompletableFuture和Mono之间的主要区别是什么?

In the previous section, we got an overview of CF behavior, but what is the central difference between CompletableFuture and Mono?

值得一提的是我们也可以阻止Mono,没有人阻止我们编写以下内容:

It worth to mention that we can do blocking Mono as well. No one prevents us from writing the following:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

当然,一旦我们订阅了Future,则调用者线程将是受阻。但是,我们始终可以通过提供其他 subscribeOn 运算符来解决此问题。不过, Mono 的更广泛的API并不是主要功能。

Of course, once we subscribe to the future, the caller thread will be blocked. But we can always work around that by providing an additional subscribeOn operator. Nevertheless, the broader API of Mono is not the key feature.

为了了解 CompletableFuture Mono ,让我们回到前面提到的 myNonBlockingHttpCall 方法实现

In order to understand the main difference between CompletableFuture and Mono, lets back to previously mentioned myNonBlockingHttpCall method implementation.

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

对于 CompletableFuture ,一旦调用了该方法,它将急切地执行对另一个服务/资源的HTTP调用。即使我们在验证某些前后条件后并不需要真正的执行结果,它也会开始执行,并且还会为该工作分配额外的CPU / DB-Connections / What-Ever-Machine-Resources。

In the case of CompletableFuture, once the method is called, it will eagerly execute HTTP call to another service/resource. Even though we will not really need the result of the execution after verifying some pre/post conditions, it starts the execution, and additional CPU/DB-Connections/What-Ever-Machine-Resources will be allocated for this work.

相反, Mono 类型在定义上是惰性的:

In contrast, the Mono type is lazy by definition:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

在这种情况下,直到最后一个都不会发生mono 已订阅。因此,只有当 myNonBlockingHttpCallWithMono 方法返回的 Mono 被订阅时,提供给 Mono.create(Consumer)将被执行。

In this case, nothing will happen until the final mono is subscribed. Thus, only when Mono returned by the myNonBlockingHttpCallWithMono method, will be subscribed, the logic provided to Mono.create(Consumer) will be executed.

我们可以走得更远。我们可以使执行更加懒惰。如您所知, Mono 从Reactive Streams规范扩展了 Publisher 。 Reactive Streams的尖叫功能是背压支持。因此,使用 Mono API,我们仅在确实需要数据且订户已准备好使用它们时才可以执行:

And we can go even further. We can make our execution much lazier. As you might know, Mono extends Publisher from the Reactive Streams specification. The screaming feature of Reactive Streams is backpressure support. Thus, using the Mono API we can do execution only when the data is really needed, and our subscriber is ready to consume them:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

在此示例中,我们仅在以下情况下执行数据订户调用 Subscription#request ,这样便声明已准备好接收数据。

In this example, we execute data only when subscriber called Subscription#request so by doing that it declared its readiness to receive data.


  • CompletableFuture 是异步的并且可以是非阻塞的

  • CompletableFuture 很渴望。您无法推迟执行。但是您可以取消它们(总比没有好)

  • Mono 是异步/非阻塞的,可以轻松地在其上执行任何调用通过用不同的运算符组成主 Mono 来实现不同的 Thread

  • Mono 确实很懒,它可以通过订阅者的存在及其准备使用数据来推迟执行启动。

  • CompletableFuture is async and can be non-blocking
  • CompletableFuture is eager. You can't postpone the execution. But you can cancel them (which is better than nothing)
  • Mono is async/non-blocking and can easily execute any call on different Thread by composing the main Mono with different operators.
  • Mono is truly lazy and allows postponing execution startup by the subscriber presence and its readiness to consume data.

这篇关于Mono vs CompletableFuture的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆