春+的WebSocket + STOMP。给特定会话的消息(非用户) [英] Spring+WebSocket+STOMP. Message to specific session (NOT user)

查看:243
本文介绍了春+的WebSocket + STOMP。给特定会话的消息(非用户)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Spring框架上设置基本的消息代理,使用我找到的配方此处

I am trying to set up basic message broker on Spring framework, using a recipe I found here

作者声称它运行良好,但我无法在客户端收到消息,但未发现任何可见错误。

Author claims it has worked well, but I am unable to receive messages on client, though no visible errors were found.

目标:

我想做的事情基本相同 - 一个客户端连接到服务器并请求一些异步操作。操作完成后,客户端应该收到一个事件。重要说明:Spring未对客户端进行身份验证,但来自消息代理的异步后端部分的事件包含其登录,因此我假设存储Login-SessionId对的并发映射以便直接向特定会话发送消息。

What I am trying to do is basically the same - a client connects to server and requests some async operation. After operation completes the client should receive an event. Important note: client is not authenticated by Spring, but an event from async back-end part of the message broker contains his login, so I assumed it would be enough to store concurrent map of Login-SessionId pairs for sending messages directly to particular session.

客户代码:

//app.js

var stompClient = null;
var subscription = '/user/queue/response';

//invoked after I hit "connect" button
function connect() {
//reading from input text form
var agentId = $("#agentId").val();

var socket = new SockJS('localhost:5555/cti');
stompClient = Stomp.over(socket);
stompClient.connect({'Login':agentId}, function (frame) {
    setConnected(true);
    console.log('Connected to subscription');
    stompClient.subscribe(subscription, function (response) {
        console.log(response);
    });
});

}

//invoked after I hit "send" button
function send() {

var cmd_str = $("#cmd").val();
var cmd = {
    'command':cmd_str
};
console.log("sending message...");
stompClient.send("/app/request", {}, JSON.stringify(cmd));
console.log("message sent");
}

这是我的配置。

//message broker configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{


@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    config.enableSimpleBroker("/topic");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");
}


@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {

    registry
            .addEndpoint("/cti")
            .setAllowedOrigins("*")
            .withSockJS();
}

}

现在,在基本配置后我应该实现一个应用程序事件处理程序,以提供有关客户端连接的会话相关信息。

Now, after basic config I should implement an application event handler to provide session-related information on client connect.

//application listener

@Service
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> {

@Autowired
//this is basically a concurrent map for storing pairs "sessionId - login"
WebAgentSessionRegistry webAgentSessionRegistry;

@Override
public void onApplicationEvent(SessionConnectEvent event) {
    StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());

    String agentId = sha.getNativeHeader("Login").get(0);
    String sessionId = sha.getSessionId();

    /** add new session to registry */
    webAgentSessionRegistry.addSession(agentId,sessionId);

    //debug: show connected to stdout
    webAgentSessionRegistry.show();

}
}

到目前为止一切顺利。在IDE中运行我的spring webapp并从两个浏览器选项卡连接我的客户端后,我在IDE控制台中得到了这个:

All good so far. After I run my spring webapp in IDE and connected my "clients" from two browser tabs I got this in IDE console:

session_id / agent_id
-----------------------------
|kecpp1vt|user1|
|10g5e10n|user2|
-----------------------------

好的,现在让我们尝试实现消息机制。

Okay, now let's try to implement message mechanics.

//STOMPController


@Controller
public class STOMPController {

@Autowired
//our registry we have already set up earlier
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
//a helper service which I will post below
MessageSender sender;

@MessageMapping("/request")
public void handleRequestMessage() throws InterruptedException {

    Map<String,String> params = new HashMap(1);
    params.put("test","test");
    //a custom object for event, not really relevant
    EventMessage msg = new EventMessage("TEST",params);

    //send to user2 (just for the sake of it)
    String s_id = webAgentSessionRegistry.getSessionId("user2");
    System.out.println("Sending message to user2. Target session: "+s_id);
    sender.sendEventToClient(msg,s_id);
    System.out.println("Message sent");

}
}

从任何地方发送消息的服务应用程序的一部分:

A service to send messages from any part of the application:

//MessageSender

@Service
public class MessageSender implements IMessageSender{

@Autowired
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
SimpMessageSendingOperations messageTemplate;

private String qName = "/queue/response";

private MessageHeaders createHeaders(String sessionId) {
    SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
    headerAccessor.setSessionId(sessionId);
    headerAccessor.setLeaveMutable(true);
    return headerAccessor.getMessageHeaders();
}

@Override
public void sendEventToClient(EventMessage event,String sessionId) {
    messageTemplate.convertAndSendToUser(sessionId,qName,event,createHeaders(sessionId));
}
}

现在,让我们试试吧。我运行我的IDE,打开Chrome并创建了2个选项卡,我连接到服务器。 User1和User2。结果控制台:

Now, let's try to test it. I run my IDE, opened Chrome and created 2 tabs form which I connected to server. User1 and User2. Result console:

 session_id / agent_id
    -----------------------------
    |kecpp1vt|user1|
    |10g5e10n|user2|
    -----------------------------
Sending message to user2. Target session: 10g5e10n
Message sent

但是,正如我在开头提到的那样 - user2得到了绝对没有,虽然他已连接并订阅/ user / queue / response。也没有错误。

But, as I mentioned in the beginning - user2 got absolutely nothing, though he is connected and subscribed to "/user/queue/response". No errors either.

一个问题是,我究竟错过了哪一点?我已经阅读了很多关于这个主题的文章,但无济于事。
SPR-11309 说这是可能的,也应该有效。也许,id-s不是实际的会话ID-s?
好​​吧也许有人知道如何监控消息是否真的已被发送,而不是被内部Spring机制删除?

A question is, where exactly I am missing the point? I have read many articles on the subject, but to no avail. SPR-11309 says it's possible and should work. Maybe, id-s aren't actual session id-s? And well maybe someone knows how to monitor if the message actually has been sent, not dropped by internal Spring mechanics?

配置错误:

//WebSocketConfig.java:
....
 @Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    // + parameter "/queue"
    config.enableSimpleBroker("/topic","/queue");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");
}
....

我花了一天时间调试内部弹簧力学找出它出错的地方:

I've spent a day debugging internal spring mechanics to find out where exactly it goes wrong:

//AbstractBrokerMessageHandler.java: 
....
protected boolean checkDestinationPrefix(String destination) {
    if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
        return true;
    }
    for (String prefix : this.destinationPrefixes) {
        if (destination.startsWith(prefix)) {
//guess what? this.destinationPrefixes contains only "/topic". Surprise, surprise
            return true;
        }
    }
    return false;
}
....

虽然我不得不承认我仍然认为文档提到不要明确配置用户个人队列,因为他们已经存在。也许我错了。

Although I have to admit I still think the documentation mentioned that user personal queues aren't to be configured explicitly cause they "already there". Maybe I just got it wrong.

推荐答案

总的来说它看起来不错,但是你能改变吗?

Overall it looks good, but could you change from

config.enableSimpleBroker("/topic");

config.enableSimpleBroker("/queue");

...看看这是否有效?希望这有帮助。

... and see if this works? Hope this help.

这篇关于春+的WebSocket + STOMP。给特定会话的消息(非用户)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆