为反应式管道编写方面 [英] Writing Aspects For Reactive Pipelines

查看:30
本文介绍了为反应式管道编写方面的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为返回承诺的方法编写方面.考虑以下方法:

I am writing Aspects for methods that return promises. Consider the following method:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

我想缓存发布是否成功.由于这是一个跨领域的关注点,Aspect 看起来是最好的设计.这是我的观点.

I want to cache if the publish was successful or not. Since this is a cross-cutting concern, an Aspect looks like the best design. Here's my Aspect for it.

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

现在由于 publishToKafka 是异步的,一旦线程切换发生并且 cache.cache() 被调用,目标方法就会返回.这不是我想要的.我想要的是,如果事件已成功发布到 Kafka,则应缓存结果.以下建议有效.

Now since publishToKafka is asynchronous, the target method returns as soon as the thread switch happens and cache.cache() is called. This is not what I want. What I want is that the result should be cached iff the event was successfully published to Kafka. The following advice works.

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

我想了解这里发生了什么.这是否发生在管道组装期间?或者在执行时间(pjp.proceed() 返回一个承诺)期间,我的建议添加了 doOnNext 操作符?

I want to understand what's going on here. Does this happen during the assembly time of the pipeline? Or during the execution time (pjp.proceed() returns a promise) to which my advice adds the doOnNext operator?

我需要在此示例的上下文中了解程序集与执行时间.

I need to understand assembly vs. execution time in the context of this example.

推荐答案

Spring AOP 和 AspectJ 切面总是在与被拦截的连接点相同的线程中同步执行.因此,如果您的拦截方法立即返回并且返回值类似于承诺、未来或空(void)与回调的组合,您不能期望在方面的通知中神奇地获得异步结果.您确实需要让方面意识到异步情况.

Both Spring AOP and AspectJ aspects are always executed synchronously in the same thread as the intercepted joinpoint. Thus, if your intercepted method returns immediately and the return value is something like a promise, a future or nothing (void) in combination with a callback, you cannot expect to magically get the asynchronous result in the aspect's advice. You do need to make the aspect aware of the asynchronous situation.

说到这里,我还想提一下,我以前从未使用过响应式编程,只知道概念.从我在您的建议中看到的内容来看,该解决方案应该可行,但有一点不太好:您让建议返回一个 new Mono 实例 由您的 doOnNext 返回(..) 调用.也许在注册缓存回调后返回从 proceed() 获得的 original Mono 会更干净,只是为了避免任何副作用.

Having said that, I also want to mention that I never used reactive programming before, I only know the concept. From what I see in your advice, the solution should work, but one thing is not so nice: You make the advice return a new Mono instance returned by your doOnNext(..) call. Maybe it would be cleaner to return the original Mono you get from proceed() after having registered your caching callback on it, just so as to avoid any side-effects.

我不知道还能解释什么,情况已经很清楚了.如果我的解释不够充分,请随时提出直接相关的后续问题.

I don't know what else to explain, the situation is pretty clear. Feel free to ask directly related follow-up questions if my explanation does not suffice.

这篇关于为反应式管道编写方面的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆