编写反应性管道的方面 [英] Writing Aspects For Reactive Pipelines
问题描述
我正在为返回promise的方法编写方面.请考虑以下方法:
I am writing Aspects for methods that return promises. Consider the following method:
public Mono<Stream> publishToKafka(Stream s) {
//publishToKafka is asynchronous
return Mono.just(s).flatMap(worker::publishToKafka);
}
如果发布成功或失败,我想缓存.由于这是一个贯穿各领域的问题,因此外观似乎是最好的设计.这是我的观点.
I want to cache if the publish was successful or not. Since this is a cross-cutting concern, an Aspect looks like the best design. Here's my Aspect for it.
@Around("@annotation....")
public Object cache() {
//get the data to cache from the annotation
Object result = pjp.proceed();
cache.cache("key","data");
return result;
}
现在,由于 publishToKafka
是异步的,因此只要发生线程切换并调用 cache.cache()
,目标方法就会返回.这不是我想要的.我想要的是,如果事件已成功发布到Kafka,则应将结果缓存.以下建议有效.
Now since publishToKafka
is asynchronous, the target method returns as soon as the thread switch happens and cache.cache()
is called. This is not what I want. What I want is that the result should be cached iff the event was successfully published to Kafka. The following advice works.
@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
//get the data to cache from the annotation
return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}
我想了解这里发生的事情.这会在管道的组装时间内发生吗?还是在执行期间( pjp.proceed()
返回一个承诺),我的建议向其中添加了 doOnNext
运算符?
I want to understand what's going on here. Does this happen during the assembly time of the pipeline? Or during the execution time (pjp.proceed()
returns a promise) to which my advice adds the doOnNext
operator?
在此示例的上下文中,我需要了解汇编与执行时间.
I need to understand assembly vs. execution time in the context of this example.
推荐答案
Spring AOP和AspectJ方面始终在与被拦截的连接点相同的线程中同步执行.因此,如果您截获的方法立即返回,并且返回值类似于promise,future或没有(void)再加上回调,则您不能期望在方面的建议中神奇地获得异步结果.您确实需要使方面意识到异步情况.
Both Spring AOP and AspectJ aspects are always executed synchronously in the same thread as the intercepted joinpoint. Thus, if your intercepted method returns immediately and the return value is something like a promise, a future or nothing (void) in combination with a callback, you cannot expect to magically get the asynchronous result in the aspect's advice. You do need to make the aspect aware of the asynchronous situation.
话虽如此,我也想提一提,我以前从未使用过反应式编程,我只知道概念.根据我在您的建议中看到的,该解决方案应该有效,但有一点不是很好:您使建议返回一个 new Mono
实例
Having said that, I also want to mention that I never used reactive programming before, I only know the concept. From what I see in your advice, the solution should work, but one thing is not so nice: You make the advice return a new Mono
instance returned by your doOnNext(..)
call. Maybe it would be cleaner to return the original Mono
you get from proceed()
after having registered your caching callback on it, just so as to avoid any side-effects.
我不知道还有什么要解释的,情况很明显.如果我的解释不够,请随时提出直接相关的后续问题.
I don't know what else to explain, the situation is pretty clear. Feel free to ask directly related follow-up questions if my explanation does not suffice.
这篇关于编写反应性管道的方面的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!