ActiveMQ Spring Stomp:如何更改现有代码以创建持久订阅 [英] ActiveMQ Spring Stomp: how can i change my existing code to create persistent subscription

查看:33
本文介绍了ActiveMQ Spring Stomp:如何更改现有代码以创建持久订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的项目中创建了一个正在运行的通知系统.我的实际代码是:

我的客户(javascript):

let connectWebSocket = () =>{socket = new SockJS(context.backend + '/myWebSocketEndPoint');stompClient = Stomp.over(socket);stompClient.connect({},function (frame) {stompClient.subscribe('/topic/notification', function(response){警报(响应);});});}连接WebSocket();

服务器(Java 和 Spring)

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{@覆盖公共无效 configureMessageBroker(MessageBrokerRegistry 配置){config.enableSimpleBroker("/topic");}@覆盖public void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/myWebSocketEndPoint").setAllowedOrigins("*").withSockJS();}}

这是有效的.现在我想在用户离线时也向他们发送通知:当他们登录时,我会(自动)向他们发送通知.我必须用activeMQ来做这件事.我看过一些例子,但不是很理解..谁能指点一下,我怎样才能准确的编辑我的代码,实现持久订阅?非常感谢

编辑:我已经更新了我的客户端代码:

let connectWebSocket = () =>{让 clientId =user.profile.id;socket = new SockJS(context.backend + '/myWebSocketEndPoint');stompClient = Stomp.over(socket);stompClient.connect({"client-id": clientId},{},function (frame) {stompClient.subscribe('/topic/notification', function(response){警报(响应);},{"activemq.subscriptionName": clientId});});}

但是当用户离线时,如果通知到达,当他在线返回时,通知不会发送给他..我想我必须改变我的服务器端

POM.xml

<依赖><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.14.2</version></依赖>

:使用 pom.xml 中的正确依赖项,我现在有一个错误.我有这个配置:

@Override公共无效 configureMessageBroker(MessageBrokerRegistry 配置){config.enableStompBrokerRelay("/topic/");}

但是当我运行我的代码时,我看到了这个错误:

2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] 应用程序启动失败org.springframework.context.ApplicationContextException: 无法启动 bean 'stompBrokerRelayMessageHandler';嵌套异常是 java.lang.NoClassDefFoundError: reactor/io/codec/Codec

这是我向客户发送通知的方式:

@Component公共类 MenuItemNotificationSender {@自动连线私人 SimpMessagingTemplate 消息模板;@自动连线公共 MenuItemNotificationSender(SimpMessagingTemplate 消息模板){this.messagingTemplate = 消息模板;}public void sendNotification(MenuItemDto menuItem) {messagesTemplate.convertAndSend("/topic/notification", menuItem);}}

解决方案

这是持久订阅者的默认行为,如果您使用默认的 AMQ 配置,消息将被持久化,如果您想在用户离线时也向他们发送通知,您需要使用持久订阅.

编辑

<块引用>

STOMP STOMP 消息中的 Persistent Messaging 是 non-persistent by默认.要使用持久消息传递,请将以下 STOMP 标头添加到所有 SEND 请求:persistent:true.此默认值与用于 JMS 消息.

要持久化已发送的消息,在 js 客户端上,您需要为此方法添加标头:

stompClient.send(destination, {"persistent":"true" }, body);

像这样更新您的 MenuItemNotificationSender :

public void sendNotification(MenuItemDto menuItem) {映射<字符串,对象>headers = new HashMap<>();headers.put("JMSDeliveryMode", 2);headers.put("persistent", "true");messagesTemplate.convertAndSend("/topic/notification", menuItem, headers);}

看看

http://activemq.apache.org/how-do-i-make-messages-durable.html

http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

使用 stomp 进行持久订阅:

 stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {console.log('已连接:' + 框架);stompClient.subscribe(主题,功能(消息){..........}, {"activemq.subscriptionName": "my-client-id"});},功能(框架){console.log("Web 套接字断开连接");});

更新

@Configuration@EnableWebSocketMessageBroker公共类 WebSocketConfig 扩展 AbstractWebSocketMessageBrokerConfigurer{@Bean(initMethod = "start", destroyMethod = "stop")public BrokerService broker() 抛出异常 {最终 BrokerService 经纪人 = new BrokerService();//broker.addConnector("tcp://localhost:61616");broker.addConnector("stomp://localhost:61613");broker.addConnector("vm://localhost");PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();File dir = new File(System.getProperty("user.home") + File.separator + "kaha");如果 (!dir.exists()) {dir.mkdirs();}persistenceAdapter.setDirectory(dir);broker.setPersistenceAdapter(persistenceAdapter);broker.setPersistent(true);退货经纪人;}@覆盖公共无效 configureMessageBroker(MessageBrokerRegistry 配置){//如果 AMQ 在本地运行,则不需要设置 relayHost &中继端口config.enableStompBrokerRelay("/topic/").setRelayHost(中继主机).setRelayPort(中继端口)//如果需要,用户密码//.setSystemLogin(activeMqLogin)//.setSystemPasscode(activeMqPassword);}@覆盖public void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/myWebSocketEndPoint").setAllowedOrigins("*").withSockJS();}}

使用父pom

<块引用>

<groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.4.3.RELEASE</version><相对路径/><!-- 从存储库中查找父级 --></父母><依赖><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></依赖><依赖><groupId>io.projectreactor</groupId><artifactId>reactor-net</artifactId></依赖><依赖><groupId>io.projectreactor.spring</groupId><artifactId>reactor-spring-context</artifactId></依赖><依赖><groupId>org.apache.activemq</groupId><artifactId>activemq-kahadb-store</artifactId></依赖><依赖><groupId>org.apache.activemq</groupId><artifactId>activemq-stomp</artifactId></依赖><依赖><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></依赖>

I have created in my project a notification system which is working. My actual code is:

My client (javascript):

let connectWebSocket = () => {
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    });
  });
}
connectWebSocket();

Server (Java with Spring)

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

This is working. Now i want to send to user notification also when they're offline: when they will make login, i'll send (in automatic) them the notifications. I have to do this with activeMQ. I've seen some examples, but don't understand well them.. Someone can indicate me how can i accurate edit my code and realize persistent subscription? Thanks a lot

EDIT: I've updated my client-side code:

let connectWebSocket = () => {
  let clientId =user.profile.id;
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({"client-id": clientId},{},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    },{"activemq.subscriptionName": clientId});
  });
}

but when a user is offline, if a notification arrive, when he returns online, the notification is not sent him..I suppose I have to change my server side

POM.xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.2</version>
</dependency>

EDIT2:: With the right dependency in pom.xml, i have now an error. I have this configuration:

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
   config.enableStompBrokerRelay("/topic/");
}

but when i run my code i see this error:

2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec

EDIT3: this is how i send notification to clients:

@Component
public class MenuItemNotificationSender {

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
    this.messagingTemplate = messagingTemplate;
}

public void sendNotification(MenuItemDto menuItem) {
    messagingTemplate.convertAndSend("/topic/notification", menuItem);
}
}

解决方案

This is default behaviour for durable subscribers if you use default AMQ config, messages will be persisted, if you want to send to user notification also when they're offline you need to use durable suscriptions.

EDIT

Persistent Messaging in STOMP STOMP messages are non-persistent by default. To use persistent messaging add the following STOMP header to all SEND requests: persistent:true. This default is the opposite of that for JMS messages.

To persist sent messages, on js client you need to add header to this method :

stompClient.send(destination,  {"persistent":"true" }, body);

UPDATE your MenuItemNotificationSender like this :

public void sendNotification(MenuItemDto menuItem) {
    Map<String, Object> headers = new HashMap<>();
    headers.put("JMSDeliveryMode", 2);
    headers.put("persistent", "true");
    messagingTemplate.convertAndSend("/topic/notification", menuItem, headers);
}

take a look

http://activemq.apache.org/how-do-i-make-messages-durable.html

http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

to make durable subscriptions with stomp :

    stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {

      console.log( 'Connected: ' + frame );

      stompClient.subscribe( topic, function ( message ) {
        .....
        .....
      }, {"activemq.subscriptionName": "my-client-id"});
   }, function(frame) {
        console.log("Web socket disconnected");
   });

UPDATE

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{



@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    //broker.addConnector("tcp://localhost:61616");
    broker.addConnector("stomp://localhost:61613");
    broker.addConnector("vm://localhost");
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
    if (!dir.exists()) {
        dir.mkdirs();
    }
    persistenceAdapter.setDirectory(dir);
    broker.setPersistenceAdapter(persistenceAdapter);
    broker.setPersistent(true);
    return broker;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    // if AMQ is running in local not needed to set relayHost & relayPort
    config.enableStompBrokerRelay("/topic/")
   .setRelayHost(relayHost)
   .setRelayPort(relayPort)
   // user pwd if needed
   //.setSystemLogin(activeMqLogin)
   //.setSystemPasscode(activeMqPassword)
   ;
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

use parent pom

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.3.RELEASE</version>
  <relativePath /> <!-- lookup parent from repository -->
</parent>





<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-net</artifactId>
</dependency>
<dependency>
  <groupId>io.projectreactor.spring</groupId>
  <artifactId>reactor-spring-context</artifactId>
</dependency>

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

这篇关于ActiveMQ Spring Stomp:如何更改现有代码以创建持久订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆