编写反应性管道的方面 [英] Writing Aspects For Reactive Pipelines

查看:50
本文介绍了编写反应性管道的方面的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为返回promise的方法编写方面.请考虑以下方法:

I am writing Aspects for methods that return promises. Consider the following method:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

如果发布成功或失败,我想缓存.由于这是一个贯穿各领域的问题,因此外观似乎是最好的设计.这是我的观点.

I want to cache if the publish was successful or not. Since this is a cross-cutting concern, an Aspect looks like the best design. Here's my Aspect for it.

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

现在,由于 publishToKafka 是异步的,因此只要发生线程切换并调用 cache.cache(),目标方法就会返回.这不是我想要的.我想要的是,如果事件已成功发布到Kafka,则应将结果缓存.以下建议有效.

Now since publishToKafka is asynchronous, the target method returns as soon as the thread switch happens and cache.cache() is called. This is not what I want. What I want is that the result should be cached iff the event was successfully published to Kafka. The following advice works.

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

我想了解这里发生的事情.这会在管道的组装时间内发生吗?还是在执行期间( pjp.proceed()返回一个承诺),我的建议向其中添加了 doOnNext 运算符?

I want to understand what's going on here. Does this happen during the assembly time of the pipeline? Or during the execution time (pjp.proceed() returns a promise) to which my advice adds the doOnNext operator?

在此示例的上下文中,我需要了解汇编与执行时间.

I need to understand assembly vs. execution time in the context of this example.

推荐答案

Spring AOP和AspectJ方面始终在与被拦截的连接点相同的线程中同步执行.因此,如果您截获的方法立即返回,并且返回值类似于promise,future或没有(void)再加上回调,则您不能期望在方面的建议中神奇地获得异步结果.您确实需要使方面意识到异步情况.

Both Spring AOP and AspectJ aspects are always executed synchronously in the same thread as the intercepted joinpoint. Thus, if your intercepted method returns immediately and the return value is something like a promise, a future or nothing (void) in combination with a callback, you cannot expect to magically get the asynchronous result in the aspect's advice. You do need to make the aspect aware of the asynchronous situation.

话虽如此,我也想提一提,我以前从未使用过反应式编程,我只知道概念.根据我在您的建议中看到的,该解决方案应该有效,但有一点不是很好:您使建议返回一个 new Mono 实例

Having said that, I also want to mention that I never used reactive programming before, I only know the concept. From what I see in your advice, the solution should work, but one thing is not so nice: You make the advice return a new Mono instance returned by your doOnNext(..) call. Maybe it would be cleaner to return the original Mono you get from proceed() after having registered your caching callback on it, just so as to avoid any side-effects.

我不知道还有什么要解释的,情况很明显.如果我的解释不够,请随时提出直接相关的后续问题.

I don't know what else to explain, the situation is pretty clear. Feel free to ask directly related follow-up questions if my explanation does not suffice.

这篇关于编写反应性管道的方面的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆