JMS-Websocket-延迟消息传递 [英] JMS-Websocket - delayed message delivery

查看:248
本文介绍了JMS-Websocket-延迟消息传递的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此应用程序收到&将消息从数据库事件转发到客户端应用程序.客户端浏览器进行Web套接字会话时,邮件将立即传递.

This application receives & forwards messages from database events to client applications. Messages are immediately delivered when the client browser has a web socket session.

但是,当不存在Web套接字会话并且JMSProducer将消息发送到QueueSenderSessionBean中的目标"jms/notificationQueue"时,该消息将立即在NotificationEndpoint中使用.这不是我的意图.

However, when no web socket session exists and a message is sent by the JMSProducer into the Destination "jms/notificationQueue" in QueueSenderSessionBean, the message is immediately consumed in NotificationEndpoint. This is not my intent.

我的意图是让队列保留消息,直到用户连接到NotificationEndpoint.如果用户未连接到NotificationEndpoint,我认为应该没有创建NotificationEndpoint实例来接收消息.

My intent is for the queue to retain the message until the user connects to NotificationEndpoint. If the user is not connected to the NotificationEndpoint, I think there should be no instance of NotificationEndpoint created to receive the message.

如何延迟JMSConsumer使用队列中的消息?

How do I delay the JMSConsumer consuming the message from the queue?

概述-TomEE Plus 8.0.0-M1项目

Overview - TomEE Plus 8.0.0-M1 project

  1. 应用程序在NotificationServlet中接收通知 HttpServletRequest
  2. 通过注入NotificationServlet的QueueSenderSessionBean将字符串消息放入JMS Queue
  3. NotificationMessageDrivenBean实现MessageListener来侦听JMS队列
  4. 从NotificationMessageDrivenBean触发一个以@NotificationServletJMSMessage注释的事件,用于 JMSMessage上的NotificationEndpoint方法.
  5. NotificationEndpoint使用PushContext收集所有websocket会话,以将消息传递给用户
  6. 在PushContext.send中,如果任何具有用户uuid属性的websocket会话与消息用户uuid属性匹配,则消息为 交付给每个websocket会话.
  1. Application receives notification in a NotificationServlet HttpServletRequest
  2. String message is put into JMS Queue by QueueSenderSessionBean injected into NotificationServlet
  3. NotificationMessageDrivenBean implements MessageListener to listen to the JMS Queue
  4. An Event annotated with @NotificationServletJMSMessage is fired from NotificationMessageDrivenBean for an Observer in NotificationEndpoint method onJMSMessage.
  5. NotificationEndpoint uses PushContext which gathers all websocket sessions to deliver the message to the user
  6. In PushContext.send, if any websocket sessions with a user uuid property matching the message user uuid property, the message is delivered to each websocket session.

我对@ServerEndpoint的理解是每个新的WS会话都有自己的实例". 通过以下方式仅通知特定用户WebSockets,当数据库中的某些内容被修改时

My understanding of @ServerEndpoint is that "each new WS session gets its own instance." Notify only specific user(s) through WebSockets, when something is modified in the database

来源:上面的链接来自 https://stackoverflow.com/users/157882/balusc https ://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2

Sources: the above link from https://stackoverflow.com/users/157882/balusc and https://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2

WEB-INF/resources.xml

WEB-INF/resources.xml

<?xml version="1.0" encoding="UTF-8"?>
<resources>
    <Resource id="jmsConnectionFactory" type="javax.jms.ConnectionFactory">
        connectionMaxIdleTime = 15 Minutes
        connectionMaxWaitTime = 5 seconds
        poolMaxSize = 10
        poolMinSize = 0
        resourceAdapter = Default JMS Resource Adapter
        transactionSupport = xa
    </Resource>
</resources>

NotificationServlet.java

NotificationServlet.java

import java.io.IOException;
import java.util.UUID;

import javax.annotation.Resource;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/notifications")
public class NotificationServlet extends HttpServlet
{
    @Resource(name = "jms/notificationQueue")
    private Queue _notificationQueue;

    @Inject
    private QueueSenderSessionBean _queueSessionSenderBean;

    @Override
    protected void doGet(HttpServletRequest request, 
            HttpServletResponse response) 
        throws ServletException, 
        IOException
    {
        try
        {
            String notificationJson =
                    extractNotificationJson(request);
            if (notificationJson != null)
            {
                _queueSessionSenderBean.sendMessage(
                        "notification=" 
                                + notificationJson);                
            }

        }
        catch (Exception e)
        {
            e.printStackTrace();
            // handle exception
        }
    }

    public String extractNotificationJson(HttpServletRequest request) 
            throws IOException
    {
        if(request.getParameter("notification") != null)
        {
            String[] notificationString = 
                    request.getParameterValues("notification");
            return notificationString[0];
        }
        return null;       
    }
}

QueueSenderSessionBean.java

QueueSenderSessionBean.java

import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.DeliveryMode;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;

import org.json.JSONObject;

@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
    @Resource(mappedName = "jms/notificationQueue")
    private Queue _notificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    // Static Methods

    // Member Methods
    public void sendMessage(String message) 
    {
        try
        {        
            JMSProducer messageProducer =
                _jmsContext.createProducer();
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

            String userProperty = "someValue";

            TextMessage textMessage = _jmsContext.createTextMessage(message);
            textMessage.setStringProperty("userProperty", userProperty);            
            messageProducer.send(_notificationQueue, textMessage);

        }
        catch (JMSException e)
        {
            e.printStackTrace();
            // handle jms exception
        }
    }
}

限定符NotificationServletJMSMessage.java

Qualifier NotificationServletJMSMessage.java

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.inject.Qualifier;

    @Qualifier
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
    public @interface NotificationServletJMSMessage
    {

    }

NotificationMessageDrivenBean.java

NotificationMessageDrivenBean.java

import javax.ejb.MessageDriven;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.Message;
import javax.jms.MessageListener;

@Named
@MessageDriven(mappedName = "jms/notificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
    @Inject
    @NotificationServletJMSMessage
    Event<Message> jmsEvent;

    @Override
    public void onMessage(Message message)
    {
        jmsEvent.fire(message);
    }
}

PushContext.java

PushContext.java

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.websocket.Session;

@ApplicationScoped
public class PushContext
{
    @Inject 
    private JMSContext _jmsContext; 

    @Resource(mappedName = "jms/notificationQueue")
    private Queue _notificationQueue;

    private Map<String, Set<Session>> _sessions;

    @PostConstruct 
    public void init()
    {
        _sessions = new ConcurrentHashMap<>();
    }

    public void add(Session session, String userUuid)
    {
        _sessions.computeIfAbsent(userUuid, 
                value -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session)
    {
        _sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
    }

    public void send(Set<String> userUuids, Message message) throws JMSException 
    {
        String userUuid = message.getStringProperty("userUuid");
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
        }
        for (Session userSession : userSessions) 
        {
            if (userSession.isOpen()) 
            {
                userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
            }
        }
    }

    public void removeSession(Session session)
    {
        String userUuid = (String)session.getUserProperties().get("userUuid");
        _sessions.remove(userUuid, session);
    }
}

NotificationEndpoint.java

NotificationEndpoint.java

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint 
{
    private static final Set<Session> SESSIONS =
            Collections.synchronizedSet(new HashSet<Session>()); 
    private QueueSenderSessionBean _senderBean;

    @Inject
    private PushContext _pushContext;

    @Inject
    public NotificationEndpoint(QueueSenderSessionBean senderBean)
    { 
        _senderBean = senderBean;
    }

    @OnOpen
    public void onOpen(Session session,
            EndpointConfig configurator,
            @PathParam(value = "tokenId") String userUuidString) 
    {
        session.getUserProperties().put("userUuid", userUuidString);        
        _pushContext.add(session, userUuidString);
    }


    @OnMessage
    public void onMessage(String message, Session session) 
            throws IOException 
    {
        System.out.println("Message received: " + message);
        _senderBean.sendMessage(message);
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) 
    {
        System.out.println(
                "Closing 'notificatioEndpoint due to " 
                + reason.getReasonPhrase());
        try
        {
            session.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        _pushContext.removeSession(session);
    }

    @OnError
    public void error(Session session, Throwable t) 
    {
       t.printStackTrace();
    }

    public static void sendToAllClients(String message) 
    {
        synchronized (SESSIONS) 
        {
            for (Session session : SESSIONS) 
            {
                if (session.isOpen()) 
                {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }

    public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message) 
    {
        Set<String> userUuids = new HashSet<String>();

        try 
        {
            _pushContext.send(userUuids, message);
        } 
        catch (JMSException ex) 
        {
            ex.printStackTrace();
            Logger.getLogger(NotificationEndpoint.class.getName()).
            log(Level.SEVERE, null, ex);
        }     
    }
}

谢谢你, 泰德(Ted S)

Thank you, Ted S

推荐答案

受解决方案启发,此处

该解决方案是,如果用户未连接到Web套接字,则使用本地队列保存消息,然后在连接时将消息从本地队列移动到远程队列,该远程队列会立即使用MessageDrivenBean接收/使用.

The solution was using a local queue to hold messages if the user was not connected to the web socket, then when connected, moving messages from the local queue to a remote queue which gets received/consumed immediately using the MessageDrivenBean.

我还没有使用Web Servlet监听数据库(Postgresql)的消息,而是将DB触发器更改为NOTIFY,并使用pgjdbc-ng驱动程序和Postgresql LISTEN/NOTIFY模式描述了异步监听器,此处.

Also, instead of listening for messages from the database (Postgresql) with a Web Servlet, I changed my DB trigger to NOTIFY and started an asynchronous listener using the pgjdbc-ng driver and the Postgresql LISTEN/NOTIFY pattern described here.

NotificationListener.java

NotificationListener.java

@Stateless
public class NotificationListener extends Thread
{
    @Inject
    private QueueSenderSessionBean _queueSessionSenderBean;

    @Override
    public void run()
    {
        listenToNotifications();
    }

    public void listenToNotifications()
    {
        PGNotificationListener listener = new PGNotificationListener()
                {
                    public void notification(int processId, String channelName, String payload)
                    {
                        System.out.println("Received notification from: "
                                + channelName + ", "
                                + payload);
                        _queueSessionSenderBean.sendMessage(payload);
                    }
                };
            PGDataSource dataSource = new PGDataSource();
            dataSource.setHost("localhost");
            dataSource.setDatabase("db");
            dataSource.setPort(5432);
            dataSource.setUser("user");
            dataSource.setPassword("pass");
        try(PGConnection connection =
                (PGConnection) dataSource.getConnection())
        {
            Statement statement = connection.createStatement();
            statement.execute("LISTEN notifications");
            statement.close();
            connection.addNotificationListener(listener);
            while (true)
            {
                if (Thread.currentThread().isInterrupted())
                {
                    break;
                } 
            }
        }
        catch (Exception e)
        {
            // TODO: handle exception
            e.printStackTrace();
        }
    }   
}

NotificationStarter.java

NotificationStarter.java

@Singleton
@Startup
public class NotificationsStarter
{
    @EJB
    private NotificationListener _listener;

    @PostConstruct
    public void startListener()
    {
        _listener.start();
    }

    @PreDestroy
    public void shutdown()
    {
        _listener.interrupt();
    }
}

PushContext.java

PushContext.java

@ApplicationScoped
public class PushContext
{

    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    private Map<String, Set<Session>> _sessions;

    @PostConstruct 
    public void init()
    {
        _sessions = new ConcurrentHashMap<>();
    }

    public void add(Session session, String userUuid)
    {
        _sessions.computeIfAbsent(userUuid, 
                value -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session)
    {
        _sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
    }

    public void send(Set<String> userUuids, Message message) throws JMSException 
    {
        String userUuid = message.getStringProperty("userUuid");
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
            for (Session userSession : userSessions) 
            {
                if (userSession.isOpen()) 
                {
                    userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
                }
            }
        }
    }

    public void removeSession(Session session)
    {
        String userUuid = (String)session.getUserProperties().get("userUuid");
        _sessions.remove(userUuid, session);
    }

    public Boolean userHasWebSocketSession(String userUuid)
    {
        Boolean sessionOpen = false;

        Set<String> userUuids = new HashSet<String>();
        userUuids.add(userUuid);

        Set<Session> userSessions;

        synchronized(_sessions) 
        {
            userSessions = _sessions.entrySet().stream()
                .filter(e -> userUuids.contains(e.getKey()))
                .flatMap(e -> e.getValue().stream())
                .collect(Collectors.toSet());
        }
        for (Session userSession : userSessions) 
        {
            if (userSession.isOpen()) 
            {
                sessionOpen = true;
                break;
            }
        }
        return sessionOpen;
    }
}

QueueSenderSessionBean.java

QueueSenderSessionBean.java

@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    @Inject
    PushContext _pushContext;

    public void sendMessage(String message) 
    {
        JMSProducer messageProducer =
                _jmsContext.createProducer();
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        try
        {        
            String userProperty = "someValue";

            TextMessage textMessage = _jmsContext.createTextMessage(message);
            textMessage.setStringProperty("userProperty", userProperty ); 
            Boolean userIsConnected = 
                    _pushContext.userHasWebSocketSession(userUuid);
            if (!userIsConnected)
            {
                messageProducer.send(_localNotificationQueue, textMessage);
            }
            else
            {
                messageProducer.send(_remoteNotificationQueue, textMessage);
            }
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }    
}

NotificationMessageDrivenBean.java现在仅侦听远程队列

NotificationMessageDrivenBean.java is now listening to only the remote queue

@Named
@MessageDriven(mappedName = "jms/remoteNotificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
    @Inject
    @NotificationServletJMSMessage
    Event<Message> jmsEvent;

    @Override
    public void onMessage(Message message)
    {
        jmsEvent.fire(message);
    }
}

无论用户何时连接到NotificationEndpoint Web套接字,New QueueReceiverSessionBean.java都用于从localNotificationQueue接收/使用消息,并将它们放置在remoteNotificationQueue中.

New QueueReceiverSessionBean.java is used to receive/consume messages from the localNotificationQueue and places them in the remoteNotificationQueue whenever the user connects to NotificationEndpoint web socket.

@Named
@LocalBean
@Stateless
public class QueueReceiverSessionBean
{
    @Resource(mappedName = "jms/localNotificationQueue")
    private Queue _localNotificationQueue;

    @Resource(mappedName = "jms/remoteNotificationQueue")
    private Queue _remoteNotificationQueue;

    @Inject
    @JMSConnectionFactory("jmsConnectionFactory")
    private JMSContext _jmsContext; 

    public void receiveQueuedMessages(String userUuidString) throws JMSException
    {
        Set<String> userUuids =
                new HashSet<String>();
        userUuids.add(userUuidString);

        JMSConsumer messageConsumer = 
                _jmsContext.createConsumer(_localNotificationQueue,
                        "userProperty='someValue'",
                        true);

        JMSProducer messageProducer =
                _jmsContext.createProducer();

        Message localMessage =
                messageConsumer.receive(10);
        while(localMessage != null)
        {
            TextMessage textMessage = 
                    _jmsContext.createTextMessage(((TextMessage) localMessage).getText());
            textMessage.setStringProperty("userUuid", userUuidString);            
            messageProducer.send(_remoteNotificationQueue, textMessage);
            localMessage.acknowledge();
            localMessage =
                    messageConsumer.receive(10);
        } 
        messageConsumer.close();
    }

    public void sendMessage(String message) 
    {
        JMSProducer messageProducer =
                _jmsContext.createProducer();
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        try
        {        
            if (message.startsWith("notification"))
            {
                String messageJson = message.substring(message.indexOf("=") + 1);
                JSONObject notificationJson =
                        new JSONObject(messageJson);
                String userUuid = notificationJson.getString("receivinguseruuid");

                TextMessage textMessage = _jmsContext.createTextMessage(message);
                textMessage.setStringProperty("userUuid", userUuid);            
                messageProducer.send(_remoteNotificationQueue, textMessage);
            }
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }
}

NotificationEndpoint.java

NotificationEndpoint.java

@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint implements Serializable
{

    private static final long serialVersionUID = 1L;
    private static final Set<Session> SESSIONS =
            Collections.synchronizedSet(new HashSet<Session>()); 
    private QueueReceiverSessionBean _senderBean;

    @Inject
    private PushContext _pushContext;

    @Inject
    public NotificationEndpoint(QueueReceiverSessionBean senderBean)
    { 
        _senderBean = senderBean;
    }

    @OnOpen
    public void onOpen(Session session,
            EndpointConfig configurator,
            @PathParam(value = "tokenId") String userUuidString) 
    {
        session.getUserProperties().put("userUuid", userUuidString );        
        _pushContext.add(session, userUuidString);
        try
        {
            _senderBean.receiveQueuedMessages(userUuidString);
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }


    @OnMessage
    public void onMessage(String message, Session session) 
            throws IOException 
    {
        _senderBean.sendMessage(message);
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) 
    {
        try
        {
            session.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        _pushContext.removeSession(session);
    }

    @OnError
    public void error(Session session, Throwable t) 
    {
       t.printStackTrace();
    }

    public static void sendToAllClients(String message) 
    {
        synchronized (SESSIONS) 
        {
            for (Session session : SESSIONS) 
            {
                if (session.isOpen()) 
                {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }

    public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message) 
    {
        Set<String> userUuids = new HashSet<String>();

        try 
        {
            _pushContext.send(userUuids, message);
        } 
        catch (JMSException ex) 
        {
            ex.printStackTrace();
            Logger.getLogger(NotificationEndpoint.class.getName()).
            log(Level.SEVERE, null, ex);
        }     
    }    
}

注意:此代码在TomEE 8.0容器中使用.将JMSContext注入EJB发现了TomEE中的一个错误,该错误中容器无法释放JMSConnection资源.问题已添加到 TomEE问题跟踪器

Note: This code was used in the TomEE 8.0 container. Injecting JMSContext into the EJBs uncovered a bug in TomEE where the container fails to release the JMSConnection resource. Issue has been added to TomEE issues tracker

这篇关于JMS-Websocket-延迟消息传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆