Java 反应器 - 链 Mono<Void>与另一个异步任务产生 Mono&lt;Object&gt; [英] Java reactor - chain Mono&lt;Void&gt; with another async task producing Mono&lt;Object&gt;

查看:455
本文介绍了Java 反应器 - 链 Mono<Void>与另一个异步任务产生 Mono&lt;Object&gt;的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下异步任务:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}

public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

及以下服务类:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be invoked at all.
                   .flatMap(dontcare -> this.save.execute(o))
    }
}

如上所示,如果 AsyncValidationTask.execute 成功完成,我尝试使用 flatMap 链接 AsyncSaveTask.execute 调用,它赢了' 不工作,因为完成后没有发出任何东西 (Mono.empty).

As shown above, I tried to use flatMap to chain the AsyncSaveTask.execute call if the AsyncValidationTask.execute completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.

我还考虑将 then 链接到第二个调用中,但无论第一个验证调用产生的 Mono.error 为何,它都会始终调用链接的调用.

I also consider then to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.

如何正确链接它们?

推荐答案

.then 仅用于链接的终端源

使用.then,以便将您的执行链接与仅发送终端信号的进程.

.then for terminal only sources to chain

Use .then, in order to chain your execution with the process, which only sends a terminal signal.

另外,注意,如果你需要对错误信号做一些事情,那么你必须事先伴随你的.thenonErrorResume.

Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then with onErrorResume beforehand.

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

.defer 以推迟单声道创建

为了仅在验证成功的情况下执行 this.save.execute(o),您还必须将其包装在 Mono.defer 中:

.defer in order to postpone mono creation

In order to execute the this.save.execute(o) only in case of successful validation, you have to wrap it in Mono.defer as well:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

通常没有必要,因为Mono是一个LAZY类型 应该 开始只在订阅发生的情况下工作(订阅 == .subscribe()).

Mono#then 的实现 保证 订阅 Monothis.save.execute 返回该方法开始RIGHT AFTERMono.defer(() -> this.validation.execute(o)) 完成.

The implementation of Mono#then guarantees that subscription to Mono returned by the this.save.execute the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o)) completed.

执行可能更早开始的唯一原因可能是目的(例如,有意提供此类行为的业务逻辑 - 缓存;热源等)ORthis.save.execute(o)INCORRECT 实现,无论实际订阅如何,它都会开始工作.

The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o) which starts doing work regardless of actual subscription.

一般来说,确保 API 可以正常工作并将其公开为 Publisher(例如 Mono | Flux>) 是懒惰的.

In general, it is a good practice to ensure that API which does work and expose that work as a Publisher (e.g. Mono | Flux) is Lazy.

这意味着 API 创建者必须确保只有在用户订阅给定 Publisher 实例的情况下才会执行工作.

It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher instance.

例如,如果您的异步 API 在下面进行 CompletableFuture 创建,则值得手动将您的 CompletableFuture 创建包装到 Mono.defer 或使用适当的方法扩展,例如 Mono.fromFuture(Supplier> futureSupplier)

For example, if your async API does CompletableFuture creation underneath, it worth to manually wrap your CompletableFuture creation into Mono.defer or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

我们来考虑如何让一个常规的ThreadPool任务提交Reactive.

Let's consider how to make a regular ThreadPool task submission Reactive.

interface Executor  {
  Future<T> execute(Callable<T> runnable); 
}

因此,为了使 Executor 具有响应性,我们必须创建如下内容:

So, in order to make Executor reactive, we have to create something like the following:

interface ReactiveExecutor {
   Mono<T> execute(Callable<T> runnable);
}

错误的实现

以下是此类适配器的可能实现:

Incorrect Implementation

The following is a possible implementation of such an adapter which works:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      MonoProcessor<T> result = MonoProcessor.create();
      Future<T> task = delegate.execute(() -> {
          T value = runnable.call();
          result.onNext(value);
          result.onComplet();
          return value;
      });

      return result.doOnCancel(() -> task.cancel());
   }
}

当然,这样的实现将起作用.但是,它有几个关键问题:

Definitely, such an implementation will be working. However, it has a few critical issues:

  1. 执行从方法调用开始(这与响应式流Publisher的惰性行为有些矛盾)
  2. 由于在实际任务订阅之前开始执行,我们必须创建一个有状态的Mono,它支持以后的订阅.
  3. 这个实现不处理根本没有订阅者的情况(例如,执行已经开始,但没有发生 .subscribe 方法(然后我们得到了无法处理的值泄漏)
  4. 一般来说,作为一个解决方案太笨拙了.此外,为了防止前面提到的所有情况,有必要将 Mono execute(..) 上的每个调用与实现之外的 Mono.defer 包装起来(参见问题中的原始问题).随后,这会导致 API 用户很容易自爆",而忘记用额外的 .defer
  5. 包装执行
  1. Execution starts on the method invocation (which somewhat contradicts to lazy behavior of reactive streams Publisher)
  2. Since execution starts before the actual task subscription, we have to create a stateful Mono, which supports later subscription.
  3. This implementation does not handle the case when there are no Subscribers at all (e.g., execution has started, but no .subscribe method happened (then we got value leak which impossible to handle)
  4. It is too hacky in general to be a solution. Also, in order to prevent all the previously mentioned cases, it is necessary to wrap every call on Mono execute(..) with Mono.defer outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer

那么,如何解决呢?

基本上,将 Mono.defer 移动到库内部就足够了.这将使 API 用户的生活变得更加轻松,因为他们不必考虑何时需要使用延迟(因此 - 不太可能出现的问题).

So, how to solve it, then?

Basically, it is enough to move the Mono.defer into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).

例如,我们的 Reactive Executor 最简单的解决方案如下:

For example, the simplest solution for our Reactive Executor can be the following:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.defer(() -> {
          MonoProcessor<T> result = MonoProcessor.create();
          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              result.onNext(value);
              result.onComplet();
              return value;
          });

          return result.doOnCancel(() -> task.cancel());
     })
   }
}

通过推迟执行,我们至少可以肯定地解决一个问题——确保不再泄漏价值.

By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.

然而,为了解决该特定情况下的所有可能问题,我们可能会使用 Mono.create,它是为采用异步 API 正确设计的:

However, in order to solve all the possible problems in that particular case, we may use Mono.create which is properly designed for adopting async API:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.crete(monoSink -> {

          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              monoSink.complete(value);
          });

          monoSink.doOnCancel(task::cancel);
     })
   }
}

使用 Mono.create 我们可以保证在每个订阅者上延迟执行.此外,使用 MonoSink API,我们可以快速连接来自订阅者的所有基本信号.最后,Mono.create 确保在任何情况下都会适当地丢弃该值.

using Mono.create we have a guarantee of lazy execution on every subscriber. Also, using MonoSink API, we can quickly hook on all the essential signals from the subscriber. Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.

最后,有了这样的 API 就没有必要在所有情况下都使用 defer

Finally, having such an API it is not necessary to use defer for all the cases

这篇关于Java 反应器 - 链 Mono<Void>与另一个异步任务产生 Mono&lt;Object&gt;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆