当activemq中客户端的空闲时间删除特定队列 [英] Delete specific queue when client's idle time in activemq
问题描述
当消费者在activemq停机时,我想删除特定的队列。我不想在消费者有空的时候发送队列消息。给我一些建议。预先感谢。
I want to delete a specific queue when consumer's down time in activemq. I don't want to send queue message when consumer available time.Give me some suggestions.Thanks in advance.
这是我的发布者课程
public class MessageHandler implements MessageListener {
private static String url = "tcp://localhost:61616";
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
private Connection connection;
private Map<String, String> messageStatus = new HashMap<String, String>();
public void setup(String systemCode, String funCode, boolean synchronous) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
if(synchronous) {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.SESSION_TRANSACTED);
} else {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
Destination requestQueue = session.createQueue(systemCode + "-" + funCode + "-request");
producer = session.createProducer(requestQueue);
Destination responseQueue = session.createQueue(systemCode + "-" + funCode + "-response");
consumer = session.createConsumer(responseQueue);
consumer.setMessageListener(this);
} catch(JMSException e) {
throw new RuntimeException("Failed to initialize MessageHandler", e);
}
}
public String sendMessage(String parameter) {
String response = null;
try {
TextMessage message = session.createTextMessage(parameter);
String messageId = UUID.randomUUID().toString();
message.setJMSCorrelationID(messageId);
producer.send(message);
boolean carryon = true;
long start = System.currentTimeMillis();
long end = start + 10 * 1000;
while (System.currentTimeMillis() < end && carryon) {
if(checkStatus(messageId)) {
carryon = false;
}
}
response = getMessage(messageId);
stop();
} catch(JMSException e) {
try {
stop();
} catch (JMSException e1) {
throw new RuntimeException("Failed to send Message", e);
}
throw new RuntimeException("Failed to send Message", e);
}
return response;
}
private String getMessage(String correlationId) {
synchronized (this) {
if (messageStatus.containsKey(correlationId)) {
String status = messageStatus.get(correlationId);
messageStatus.remove(correlationId);
return status;
} else {
return null;
}
}
}
private boolean checkStatus(String messageId) {
return messageStatus.containsKey(messageId);
}
public void onMessage(Message message) {
synchronized (this) {
try {
if (message instanceof TextMessage) {
String originalMessageId = message.getJMSCorrelationID();
String responseText = ((TextMessage) message).getText();
messageStatus.put(originalMessageId, responseText);
}
} catch (JMSException e) {
throw new RuntimeException("Failed to receipt Message", e);
}
}
}
public void stop() throws JMSException {
session.close();
connection.close();
}
public static void main(String[] args) throws Exception {
System.out.println("Caller Client.....");
MessageHandler handler = new MessageHandler();
handler.setup("P001", "FUC0001", true);
String response = handler.sendMessage("xxxxxx");
System.out.println(response);
}
}
当我使用 Session时。 SESSION_TRANSACTED
,我无法从我的侦听器类订阅,并且队列中没有消息。我的目标是没有消费者时,我要删除队列,如果有消费者,则可以订阅。
When i use Session.SESSION_TRANSACTED
, i can't subscribe from my listener class and there is no message in queue.My goal is when there is no consumer,i want to delete queue and if there any consumer,they can subscribe.
推荐答案
我的要求
同步进程
客户端将消息发送到服务器,但是MessageLestener处于不活动状态/关闭状态,我想从队列中删除该特定消息。
Client send the message to the server, but MessageLestener is not active/down, I want to remove this specific message from the queue.
如何使用messageid从队列中删除特定消息?
我也喜欢您的问题,我提供可恢复的功能。您只需要传递 MessageId
和 Queue
名称。
I also have like your problem, I provide the resuable function. You just need to pass MessageId
and Queue
name. It is ok for me.
private void deleteMessage(String messageId, String queueName) {
try {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean proxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, name, BrokerViewMBean.class, true);
for (ObjectName queue : proxy.getQueues()) {
QueueViewMBean queueBean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, queue, QueueViewMBean.class, true);
if(queueBean.getName().equals(queueName)) {
System.out.println("Deleted : " + messageId);
queueBean.removeMessage(messageId);
return;
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
我使用 activemq-all -5.8.0.jar
。
这篇关于当activemq中客户端的空闲时间删除特定队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!