Java 反应器 - 链 Mono<Void>与另一个异步任务产生 Mono<Object> [英] Java reactor - chain Mono<Void> with another async task producing Mono<Object>
问题描述
我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
及以下服务类:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
如上所示,如果 AsyncValidationTask.execute
成功完成,我尝试使用 flatMap
链接 AsyncSaveTask.execute
调用,它赢了' 不工作,因为完成后没有发出任何东西 (Mono.empty).
As shown above, I tried to use flatMap
to chain the AsyncSaveTask.execute
call if the AsyncValidationTask.execute
completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.
我还考虑将 then
链接到第二个调用中,但无论第一个验证调用产生的 Mono.error 为何,它都会始终调用链接的调用.
I also consider then
to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.
如何正确链接它们?
推荐答案
.then
仅用于链接的终端源
使用.then
,以便将您的执行链接与仅发送终端信号的进程.
.then
for terminal only sources to chain
Use .then
, in order to chain your execution with the process, which only sends a terminal signal.
另外,注意,如果你需要对错误信号做一些事情,那么你必须事先伴随你的.then
和onErrorResume
.
Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then
with onErrorResume
beforehand.
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
以推迟单声道创建
为了仅在验证成功的情况下执行 this.save.execute(o)
,您还必须将其包装在 Mono.defer
中:
.defer
in order to postpone mono creation
In order to execute the this.save.execute(o)
only in case of successful validation, you have to wrap it in Mono.defer
as well:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
通常没有必要,因为Mono
是一个LAZY类型 应该 开始只在订阅发生的情况下工作(订阅 == .subscribe()
).
Mono#then
的实现 保证 订阅 Mono
由 this.save.execute
返回该方法开始RIGHT AFTERMono.defer(() -> this.validation.execute(o))
完成强>.
The implementation of Mono#then
guarantees that subscription to Mono
returned by the this.save.execute
the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o))
completed.
执行可能更早开始的唯一原因可能是目的(例如,有意提供此类行为的业务逻辑 - 缓存;热源等)ORthis.save.execute(o)
的 INCORRECT 实现,无论实际订阅如何,它都会开始工作.
The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o)
which starts doing work regardless of actual subscription.
一般来说,确保 API 可以正常工作并将其公开为 Publisher
(例如 Mono
| Flux
>) 是懒惰的.
In general, it is a good practice to ensure that API which does work and expose that work as a Publisher
(e.g. Mono
| Flux
) is Lazy.
这意味着 API 创建者必须确保只有在用户订阅给定 Publisher
实例的情况下才会执行工作.
It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher
instance.
例如,如果您的异步 API 在下面进行 CompletableFuture
创建,则值得手动将您的 CompletableFuture
创建包装到 Mono.defer
或使用适当的方法扩展,例如 Mono.fromFuture(Supplier extends CompletableFuture extends T>> futureSupplier)
For example, if your async API does CompletableFuture
creation underneath, it worth to manually wrap your CompletableFuture
creation into Mono.defer
or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
我们来考虑如何让一个常规的ThreadPool任务提交Reactive.
Let's consider how to make a regular ThreadPool task submission Reactive.
interface Executor {
Future<T> execute(Callable<T> runnable);
}
因此,为了使 Executor
具有响应性,我们必须创建如下内容:
So, in order to make Executor
reactive, we have to create something like the following:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
错误的实现
以下是此类适配器的可能实现:
Incorrect Implementation
The following is a possible implementation of such an adapter which works:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
当然,这样的实现将起作用.但是,它有几个关键问题:
Definitely, such an implementation will be working. However, it has a few critical issues:
- 执行从方法调用开始(这与响应式流
Publisher
的惰性行为有些矛盾) - 由于在实际任务订阅之前开始执行,我们必须创建一个有状态的
Mono
,它支持以后的订阅. - 这个实现不处理根本没有订阅者的情况(例如,执行已经开始,但没有发生
.subscribe
方法(然后我们得到了无法处理的值泄漏)立> - 一般来说,作为一个解决方案太笨拙了.此外,为了防止前面提到的所有情况,有必要将
Mono execute(..)
上的每个调用与实现之外的Mono.defer
包装起来(参见问题中的原始问题).随后,这会导致 API 用户很容易自爆",而忘记用额外的.defer
包装执行
- Execution starts on the method invocation (which somewhat contradicts to lazy behavior of reactive streams
Publisher
) - Since execution starts before the actual task subscription, we have to create a stateful
Mono
, which supports later subscription. - This implementation does not handle the case when there are no Subscribers at all (e.g., execution has started, but no
.subscribe
method happened (then we got value leak which impossible to handle) - It is too hacky in general to be a solution. Also, in order to prevent all the previously mentioned cases, it is necessary to wrap every call on
Mono execute(..)
withMono.defer
outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra.defer
那么,如何解决呢?
基本上,将 Mono.defer
移动到库内部就足够了.这将使 API 用户的生活变得更加轻松,因为他们不必考虑何时需要使用延迟(因此 - 不太可能出现的问题).
So, how to solve it, then?
Basically, it is enough to move the Mono.defer
into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).
例如,我们的 Reactive Executor 最简单的解决方案如下:
For example, the simplest solution for our Reactive Executor can be the following:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
通过推迟执行,我们至少可以肯定地解决一个问题——确保不再泄漏价值.
By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.
然而,为了解决该特定情况下的所有可能问题,我们可能会使用 Mono.create
,它是为采用异步 API 正确设计的:
However, in order to solve all the possible problems in that particular case, we may use Mono.create
which is properly designed for adopting async API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
使用 Mono.create
我们可以保证在每个订阅者上延迟执行.此外,使用 MonoSink
API,我们可以快速连接来自订阅者的所有基本信号.最后,Mono.create 确保在任何情况下都会适当地丢弃该值.
using Mono.create
we have a guarantee of lazy execution on every subscriber.
Also, using MonoSink
API, we can quickly hook on all the essential signals from the subscriber.
Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.
最后,有了这样的 API 就没有必要在所有情况下都使用 defer
Finally, having such an API it is not necessary to use defer for all the cases
这篇关于Java 反应器 - 链 Mono<Void>与另一个异步任务产生 Mono<Object>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!