Spring 5 Web Reactive - 热发布 - 如何使用 EmitterProcessor 将 MessageListener 桥接到事件流 [英] Spring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event stream

查看:104
本文介绍了Spring 5 Web Reactive - 热发布 - 如何使用 EmitterProcessor 将 MessageListener 桥接到事件流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

示例项目位于此处:https://github.com/codependent/spring5-playground

我想将从 JMS 队列接收到的消息桥接到反应控制器中,该控制器将消息作为事件流发布.

I would like to bridge a message received from a JMS queue into a Reactive Controller that would publish the messages as an event stream.

我不希望消息被重放,也就是说,如果消息到达并且没有任何订阅者,我不希望在任何订阅者之后发送它们,所以我使用的是 EmitterProcessor:

I don't want the messages to be replayed, that is, if a message arrives and there isn't any subscriber I don't want them to be sent later when any subsbribes, so I am using an EmitterProcessor:

@Component
public class AlertEmitterProcessor {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private EmitterProcessor<Alert> processor;

    public AlertEmitterProcessor(){
        processor = EmitterProcessor.<Alert>create();
        processor.connect();
    }

    public EmitterProcessor<Alert> getProcessor() {
        return processor;
    }

    public void onNext(Alert alert){
        logger.info("onNext [{}]", alert);
        processor.onNext(alert);
    }

    public void onComplete(){
        logger.info("onComplete");
        processor.onComplete();
    }

    public void onError(Throwable t){
        logger.error("onError", t);
        processor.onError(t);
    }
}

这是我的MessageListener:

@Component
public class AlertMessageListener implements MessageListener{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired 
    private AlertEmitterProcessor alertProcessor;

    @Autowired
    private MappingJackson2HttpMessageConverter jacksonMessageConverter;

    @Override
    public void onMessage(Message message) {
        logger.info("Message received: [{}]", message);
        TextMessage tm = (TextMessage)message;
        try {
            Alert alert = jacksonMessageConverter.getObjectMapper().readValue(tm.getText(), Alert.class);
            alertProcessor.onNext(alert);
        } catch (IOException | JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

最后是我的休息控制器:

And finally my Rest Controller:

@Autowired
private AlertEmitterProcessor alertTopicProcessor;

@Autowired 
private AlertMessageListener messageListener;

@Autowired
private MappingJackson2HttpMessageConverter jacksonMessageConverter;

@GetMapping(value="/accounts/{id}/alerts/live2", produces="text/event-stream")
public Flux<Alert> getAccountAlertsStreaming2(@PathVariable Integer id) {
    return alertTopicProcessor.getProcessor()
        .log().filter( a -> a.getAccountId().equals(id) );
}

为了测试它的行为,我添加了这个控制器方法来模拟在队列中插入:

To test its behaviour I have added this controller method to simulate inserting in the queue:

@GetMapping(value="/mock/accounts/{id}/alerts/put", produces="text/event-stream")
public void putAlert(@PathVariable Integer id) throws JsonProcessingException {
    Alert alert = new Alert(id, (long)Math.round(Math.random()*10), "Message");
    String alertStr = jacksonMessageConverter.getObjectMapper().writeValueAsString(alert);
    TextMessage tm = new MockTextMessage(alertStr);
    messageListener.onMessage(tm);
}

在启动应用程序后,我立即加载 http://localhost:8080/accounts/1/alerts/live2 并且浏览器等待数据.

Right after starting the application I load http://localhost:8080/accounts/1/alerts/live2 and the browser waits for data.

2016-10-03 13:43:38.755 DEBUG 12800 --- [nio-8080-exec-1] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/accounts/1/alerts/live2]
2016-10-03 13:43:38.770 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /accounts/1/alerts/live2
2016-10-03 13:43:38.778 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:43:38.779 DEBUG 12800 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:38.800  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@54d4fb36)
2016-10-03 13:43:38.802  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : request(unbounded)
2016-10-03 13:43:38.803  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onNext(1)
2016-10-03 13:43:38.822  INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2          : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@227405f2)
2016-10-03 13:43:38.822  INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2          : request(1)
2016-10-03 13:43:38.823  INFO 12800 --- [nio-8080-exec-1] reactor.unresolved                       : onComplete()

然后我发布一些消息http://localhost:8080/mock/accounts/1/alerts/put.

2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/mock/accounts/1/alerts/put]
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /mock/accounts/1/alerts/put
2016-10-03 13:43:43.068 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public void com.codependent.spring5.playground.reactive.web.AccountsRestController.putAlert(java.lang.Integer) throws com.fasterxml.jackson.core.JsonProcessingException]
2016-10-03 13:43:43.069 DEBUG 12800 --- [nio-8080-exec-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:43.071  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@2ba7d09c)
2016-10-03 13:43:43.071  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : request(unbounded)
2016-10-03 13:43:43.072  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onNext(1)
2016-10-03 13:43:43.112  INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertMessageListener   : Message received: [com.codependent.spring5.playground.reactive.message.MockTextMessage@37262c9e]
2016-10-03 13:43:43.145  INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertEmitterProcessor  : onNext [Alert [alertId=3, message=Message, accountId=1]]
2016-10-03 13:43:43.146  INFO 12800 --- [nio-8080-exec-2] reactor.Flux.EmitterProcessor.2          : onNext(Alert [alertId=3, message=Message, accountId=1])
2016-10-03 13:43:43.177  INFO 12800 --- [nio-8080-exec-2] reactor.unresolved                       : onComplete()
2016-10-03 13:43:43.177 DEBUG 12800 --- [nio-8080-exec-2] o.s.h.s.r.ServletHttpHandlerAdapter      : Successfully completed request

但没有人进入浏览器.这最终以 500 错误(无日志)结束.

经过一些手动重试后,它开始接收数据...

After some manual retries it starts receiving data...

2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@909f06f)
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : request(unbounded)
2016-10-03 13:45:07.727  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onNext(1)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@7ce1f3e)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)
2016-10-03 13:45:07.729  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onNext(Alert [alertId=4, message=Message, accountId=1])
2016-10-03 13:45:07.730  INFO 12800 --- [nio-8080-exec-8] reactor.unresolved                       : onComplete()
2016-10-03 13:45:07.747  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)
2016-10-03 13:45:07.747  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : onNext(Alert [alertId=0, message=Message, accountId=1])
2016-10-03 13:45:07.748  INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9          : request(1)

...但很多时候它没有得到任何.

... but many other times it doesn't get any.

推荐答案

已在 Spring 5 M3 中解决:

It was resolved in Spring 5 M3:

这篇关于Spring 5 Web Reactive - 热发布 - 如何使用 EmitterProcessor 将 MessageListener 桥接到事件流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆