JMS-Websocket-延迟消息传递 [英] JMS-Websocket - delayed message delivery
问题描述
此应用程序收到&将消息从数据库事件转发到客户端应用程序.客户端浏览器进行Web套接字会话时,邮件将立即传递.
This application receives & forwards messages from database events to client applications. Messages are immediately delivered when the client browser has a web socket session.
但是,当不存在Web套接字会话并且JMSProducer将消息发送到QueueSenderSessionBean中的目标"jms/notificationQueue"时,该消息将立即在NotificationEndpoint中使用.这不是我的意图.
However, when no web socket session exists and a message is sent by the JMSProducer into the Destination "jms/notificationQueue" in QueueSenderSessionBean, the message is immediately consumed in NotificationEndpoint. This is not my intent.
我的意图是让队列保留消息,直到用户连接到NotificationEndpoint.如果用户未连接到NotificationEndpoint,我认为应该没有创建NotificationEndpoint实例来接收消息.
My intent is for the queue to retain the message until the user connects to NotificationEndpoint. If the user is not connected to the NotificationEndpoint, I think there should be no instance of NotificationEndpoint created to receive the message.
如何延迟JMSConsumer使用队列中的消息?
How do I delay the JMSConsumer consuming the message from the queue?
概述-TomEE Plus 8.0.0-M1项目
Overview - TomEE Plus 8.0.0-M1 project
- 应用程序在NotificationServlet中接收通知 HttpServletRequest
- 通过注入NotificationServlet的QueueSenderSessionBean将字符串消息放入JMS Queue
- NotificationMessageDrivenBean实现MessageListener来侦听JMS队列
- 从NotificationMessageDrivenBean触发一个以@NotificationServletJMSMessage注释的事件,用于 JMSMessage上的NotificationEndpoint方法.
- NotificationEndpoint使用PushContext收集所有websocket会话,以将消息传递给用户
- 在PushContext.send中,如果任何具有用户uuid属性的websocket会话与消息用户uuid属性匹配,则消息为 交付给每个websocket会话.
- Application receives notification in a NotificationServlet HttpServletRequest
- String message is put into JMS Queue by QueueSenderSessionBean injected into NotificationServlet
- NotificationMessageDrivenBean implements MessageListener to listen to the JMS Queue
- An Event annotated with @NotificationServletJMSMessage is fired from NotificationMessageDrivenBean for an Observer in NotificationEndpoint method onJMSMessage.
- NotificationEndpoint uses PushContext which gathers all websocket sessions to deliver the message to the user
- In PushContext.send, if any websocket sessions with a user uuid property matching the message user uuid property, the message is delivered to each websocket session.
我对@ServerEndpoint的理解是每个新的WS会话都有自己的实例". 通过以下方式仅通知特定用户WebSockets,当数据库中的某些内容被修改时
My understanding of @ServerEndpoint is that "each new WS session gets its own instance." Notify only specific user(s) through WebSockets, when something is modified in the database
来源:上面的链接来自 https://stackoverflow.com/users/157882/balusc 和 https ://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2
Sources: the above link from https://stackoverflow.com/users/157882/balusc and https://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2
WEB-INF/resources.xml
WEB-INF/resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<resources>
<Resource id="jmsConnectionFactory" type="javax.jms.ConnectionFactory">
connectionMaxIdleTime = 15 Minutes
connectionMaxWaitTime = 5 seconds
poolMaxSize = 10
poolMinSize = 0
resourceAdapter = Default JMS Resource Adapter
transactionSupport = xa
</Resource>
</resources>
NotificationServlet.java
NotificationServlet.java
import java.io.IOException;
import java.util.UUID;
import javax.annotation.Resource;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/notifications")
public class NotificationServlet extends HttpServlet
{
@Resource(name = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
private QueueSenderSessionBean _queueSessionSenderBean;
@Override
protected void doGet(HttpServletRequest request,
HttpServletResponse response)
throws ServletException,
IOException
{
try
{
String notificationJson =
extractNotificationJson(request);
if (notificationJson != null)
{
_queueSessionSenderBean.sendMessage(
"notification="
+ notificationJson);
}
}
catch (Exception e)
{
e.printStackTrace();
// handle exception
}
}
public String extractNotificationJson(HttpServletRequest request)
throws IOException
{
if(request.getParameter("notification") != null)
{
String[] notificationString =
request.getParameterValues("notification");
return notificationString[0];
}
return null;
}
}
QueueSenderSessionBean.java
QueueSenderSessionBean.java
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.DeliveryMode;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import org.json.JSONObject;
@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
@JMSConnectionFactory("jmsConnectionFactory")
private JMSContext _jmsContext;
// Static Methods
// Member Methods
public void sendMessage(String message)
{
try
{
JMSProducer messageProducer =
_jmsContext.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
String userProperty = "someValue";
TextMessage textMessage = _jmsContext.createTextMessage(message);
textMessage.setStringProperty("userProperty", userProperty);
messageProducer.send(_notificationQueue, textMessage);
}
catch (JMSException e)
{
e.printStackTrace();
// handle jms exception
}
}
}
限定符NotificationServletJMSMessage.java
Qualifier NotificationServletJMSMessage.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface NotificationServletJMSMessage
{
}
NotificationMessageDrivenBean.java
NotificationMessageDrivenBean.java
import javax.ejb.MessageDriven;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.Message;
import javax.jms.MessageListener;
@Named
@MessageDriven(mappedName = "jms/notificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
@Inject
@NotificationServletJMSMessage
Event<Message> jmsEvent;
@Override
public void onMessage(Message message)
{
jmsEvent.fire(message);
}
}
PushContext.java
PushContext.java
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.websocket.Session;
@ApplicationScoped
public class PushContext
{
@Inject
private JMSContext _jmsContext;
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
private Map<String, Set<Session>> _sessions;
@PostConstruct
public void init()
{
_sessions = new ConcurrentHashMap<>();
}
public void add(Session session, String userUuid)
{
_sessions.computeIfAbsent(userUuid,
value -> ConcurrentHashMap.newKeySet()).add(session);
}
void remove(Session session)
{
_sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
}
public void send(Set<String> userUuids, Message message) throws JMSException
{
String userUuid = message.getStringProperty("userUuid");
userUuids.add(userUuid);
Set<Session> userSessions;
synchronized(_sessions)
{
userSessions = _sessions.entrySet().stream()
.filter(e -> userUuids.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
}
for (Session userSession : userSessions)
{
if (userSession.isOpen())
{
userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
}
}
}
public void removeSession(Session session)
{
String userUuid = (String)session.getUserProperties().get("userUuid");
_sessions.remove(userUuid, session);
}
}
NotificationEndpoint.java
NotificationEndpoint.java
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint
{
private static final Set<Session> SESSIONS =
Collections.synchronizedSet(new HashSet<Session>());
private QueueSenderSessionBean _senderBean;
@Inject
private PushContext _pushContext;
@Inject
public NotificationEndpoint(QueueSenderSessionBean senderBean)
{
_senderBean = senderBean;
}
@OnOpen
public void onOpen(Session session,
EndpointConfig configurator,
@PathParam(value = "tokenId") String userUuidString)
{
session.getUserProperties().put("userUuid", userUuidString);
_pushContext.add(session, userUuidString);
}
@OnMessage
public void onMessage(String message, Session session)
throws IOException
{
System.out.println("Message received: " + message);
_senderBean.sendMessage(message);
}
@OnClose
public void onClose(CloseReason reason, Session session)
{
System.out.println(
"Closing 'notificatioEndpoint due to "
+ reason.getReasonPhrase());
try
{
session.close();
}
catch (IOException e)
{
e.printStackTrace();
}
_pushContext.removeSession(session);
}
@OnError
public void error(Session session, Throwable t)
{
t.printStackTrace();
}
public static void sendToAllClients(String message)
{
synchronized (SESSIONS)
{
for (Session session : SESSIONS)
{
if (session.isOpen())
{
session.getAsyncRemote().sendText(message);
}
}
}
}
public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message)
{
Set<String> userUuids = new HashSet<String>();
try
{
_pushContext.send(userUuids, message);
}
catch (JMSException ex)
{
ex.printStackTrace();
Logger.getLogger(NotificationEndpoint.class.getName()).
log(Level.SEVERE, null, ex);
}
}
}
谢谢你, 泰德(Ted S)
Thank you, Ted S
推荐答案
受解决方案启发,此处
该解决方案是,如果用户未连接到Web套接字,则使用本地队列保存消息,然后在连接时将消息从本地队列移动到远程队列,该远程队列会立即使用MessageDrivenBean接收/使用.
The solution was using a local queue to hold messages if the user was not connected to the web socket, then when connected, moving messages from the local queue to a remote queue which gets received/consumed immediately using the MessageDrivenBean.
我还没有使用Web Servlet监听数据库(Postgresql)的消息,而是将DB触发器更改为NOTIFY,并使用pgjdbc-ng驱动程序和Postgresql LISTEN/NOTIFY模式描述了异步监听器,此处.
Also, instead of listening for messages from the database (Postgresql) with a Web Servlet, I changed my DB trigger to NOTIFY and started an asynchronous listener using the pgjdbc-ng driver and the Postgresql LISTEN/NOTIFY pattern described here.
NotificationListener.java
NotificationListener.java
@Stateless
public class NotificationListener extends Thread
{
@Inject
private QueueSenderSessionBean _queueSessionSenderBean;
@Override
public void run()
{
listenToNotifications();
}
public void listenToNotifications()
{
PGNotificationListener listener = new PGNotificationListener()
{
public void notification(int processId, String channelName, String payload)
{
System.out.println("Received notification from: "
+ channelName + ", "
+ payload);
_queueSessionSenderBean.sendMessage(payload);
}
};
PGDataSource dataSource = new PGDataSource();
dataSource.setHost("localhost");
dataSource.setDatabase("db");
dataSource.setPort(5432);
dataSource.setUser("user");
dataSource.setPassword("pass");
try(PGConnection connection =
(PGConnection) dataSource.getConnection())
{
Statement statement = connection.createStatement();
statement.execute("LISTEN notifications");
statement.close();
connection.addNotificationListener(listener);
while (true)
{
if (Thread.currentThread().isInterrupted())
{
break;
}
}
}
catch (Exception e)
{
// TODO: handle exception
e.printStackTrace();
}
}
}
NotificationStarter.java
NotificationStarter.java
@Singleton
@Startup
public class NotificationsStarter
{
@EJB
private NotificationListener _listener;
@PostConstruct
public void startListener()
{
_listener.start();
}
@PreDestroy
public void shutdown()
{
_listener.interrupt();
}
}
PushContext.java
PushContext.java
@ApplicationScoped
public class PushContext
{
@Resource(mappedName = "jms/localNotificationQueue")
private Queue _localNotificationQueue;
@Resource(mappedName = "jms/remoteNotificationQueue")
private Queue _remoteNotificationQueue;
private Map<String, Set<Session>> _sessions;
@PostConstruct
public void init()
{
_sessions = new ConcurrentHashMap<>();
}
public void add(Session session, String userUuid)
{
_sessions.computeIfAbsent(userUuid,
value -> ConcurrentHashMap.newKeySet()).add(session);
}
void remove(Session session)
{
_sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
}
public void send(Set<String> userUuids, Message message) throws JMSException
{
String userUuid = message.getStringProperty("userUuid");
userUuids.add(userUuid);
Set<Session> userSessions;
synchronized(_sessions)
{
userSessions = _sessions.entrySet().stream()
.filter(e -> userUuids.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
for (Session userSession : userSessions)
{
if (userSession.isOpen())
{
userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
}
}
}
}
public void removeSession(Session session)
{
String userUuid = (String)session.getUserProperties().get("userUuid");
_sessions.remove(userUuid, session);
}
public Boolean userHasWebSocketSession(String userUuid)
{
Boolean sessionOpen = false;
Set<String> userUuids = new HashSet<String>();
userUuids.add(userUuid);
Set<Session> userSessions;
synchronized(_sessions)
{
userSessions = _sessions.entrySet().stream()
.filter(e -> userUuids.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
}
for (Session userSession : userSessions)
{
if (userSession.isOpen())
{
sessionOpen = true;
break;
}
}
return sessionOpen;
}
}
QueueSenderSessionBean.java
QueueSenderSessionBean.java
@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
@Resource(mappedName = "jms/localNotificationQueue")
private Queue _localNotificationQueue;
@Resource(mappedName = "jms/remoteNotificationQueue")
private Queue _remoteNotificationQueue;
@Inject
@JMSConnectionFactory("jmsConnectionFactory")
private JMSContext _jmsContext;
@Inject
PushContext _pushContext;
public void sendMessage(String message)
{
JMSProducer messageProducer =
_jmsContext.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
try
{
String userProperty = "someValue";
TextMessage textMessage = _jmsContext.createTextMessage(message);
textMessage.setStringProperty("userProperty", userProperty );
Boolean userIsConnected =
_pushContext.userHasWebSocketSession(userUuid);
if (!userIsConnected)
{
messageProducer.send(_localNotificationQueue, textMessage);
}
else
{
messageProducer.send(_remoteNotificationQueue, textMessage);
}
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
NotificationMessageDrivenBean.java现在仅侦听远程队列
NotificationMessageDrivenBean.java is now listening to only the remote queue
@Named
@MessageDriven(mappedName = "jms/remoteNotificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
@Inject
@NotificationServletJMSMessage
Event<Message> jmsEvent;
@Override
public void onMessage(Message message)
{
jmsEvent.fire(message);
}
}
无论用户何时连接到NotificationEndpoint Web套接字,New QueueReceiverSessionBean.java都用于从localNotificationQueue接收/使用消息,并将它们放置在remoteNotificationQueue中.
New QueueReceiverSessionBean.java is used to receive/consume messages from the localNotificationQueue and places them in the remoteNotificationQueue whenever the user connects to NotificationEndpoint web socket.
@Named
@LocalBean
@Stateless
public class QueueReceiverSessionBean
{
@Resource(mappedName = "jms/localNotificationQueue")
private Queue _localNotificationQueue;
@Resource(mappedName = "jms/remoteNotificationQueue")
private Queue _remoteNotificationQueue;
@Inject
@JMSConnectionFactory("jmsConnectionFactory")
private JMSContext _jmsContext;
public void receiveQueuedMessages(String userUuidString) throws JMSException
{
Set<String> userUuids =
new HashSet<String>();
userUuids.add(userUuidString);
JMSConsumer messageConsumer =
_jmsContext.createConsumer(_localNotificationQueue,
"userProperty='someValue'",
true);
JMSProducer messageProducer =
_jmsContext.createProducer();
Message localMessage =
messageConsumer.receive(10);
while(localMessage != null)
{
TextMessage textMessage =
_jmsContext.createTextMessage(((TextMessage) localMessage).getText());
textMessage.setStringProperty("userUuid", userUuidString);
messageProducer.send(_remoteNotificationQueue, textMessage);
localMessage.acknowledge();
localMessage =
messageConsumer.receive(10);
}
messageConsumer.close();
}
public void sendMessage(String message)
{
JMSProducer messageProducer =
_jmsContext.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
try
{
if (message.startsWith("notification"))
{
String messageJson = message.substring(message.indexOf("=") + 1);
JSONObject notificationJson =
new JSONObject(messageJson);
String userUuid = notificationJson.getString("receivinguseruuid");
TextMessage textMessage = _jmsContext.createTextMessage(message);
textMessage.setStringProperty("userUuid", userUuid);
messageProducer.send(_remoteNotificationQueue, textMessage);
}
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
NotificationEndpoint.java
NotificationEndpoint.java
@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint implements Serializable
{
private static final long serialVersionUID = 1L;
private static final Set<Session> SESSIONS =
Collections.synchronizedSet(new HashSet<Session>());
private QueueReceiverSessionBean _senderBean;
@Inject
private PushContext _pushContext;
@Inject
public NotificationEndpoint(QueueReceiverSessionBean senderBean)
{
_senderBean = senderBean;
}
@OnOpen
public void onOpen(Session session,
EndpointConfig configurator,
@PathParam(value = "tokenId") String userUuidString)
{
session.getUserProperties().put("userUuid", userUuidString );
_pushContext.add(session, userUuidString);
try
{
_senderBean.receiveQueuedMessages(userUuidString);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
@OnMessage
public void onMessage(String message, Session session)
throws IOException
{
_senderBean.sendMessage(message);
}
@OnClose
public void onClose(CloseReason reason, Session session)
{
try
{
session.close();
}
catch (IOException e)
{
e.printStackTrace();
}
_pushContext.removeSession(session);
}
@OnError
public void error(Session session, Throwable t)
{
t.printStackTrace();
}
public static void sendToAllClients(String message)
{
synchronized (SESSIONS)
{
for (Session session : SESSIONS)
{
if (session.isOpen())
{
session.getAsyncRemote().sendText(message);
}
}
}
}
public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message)
{
Set<String> userUuids = new HashSet<String>();
try
{
_pushContext.send(userUuids, message);
}
catch (JMSException ex)
{
ex.printStackTrace();
Logger.getLogger(NotificationEndpoint.class.getName()).
log(Level.SEVERE, null, ex);
}
}
}
注意:此代码在TomEE 8.0容器中使用.将JMSContext注入EJB发现了TomEE中的一个错误,该错误中容器无法释放JMSConnection资源.问题已添加到 TomEE问题跟踪器
Note: This code was used in the TomEE 8.0 container. Injecting JMSContext into the EJBs uncovered a bug in TomEE where the container fails to release the JMSConnection resource. Issue has been added to TomEE issues tracker
这篇关于JMS-Websocket-延迟消息传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!