如何连续读取一个线程中的JMS消息并根据另一个线程中的JMSMessageID对其进行了解? [英] How to continuously read JMS Messages in a thread and achnowledge them based on their JMSMessageID in another thread?

查看:164
本文介绍了如何连续读取一个线程中的JMS消息并根据另一个线程中的JMSMessageID对其进行了解?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个连续的JMS消息接收器: 在这里,我正在使用CLIENT_ACKNOWLEDGE,因为我不希望该线程确认消息.

 (...)
connection.start();
session = connection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(QueueId);
receiver = session.createReceiver(queue);
While (true) {
  message = receiver.receive(1000);
  if ( message != null ) {
    // NB : I can only pass Strings to the other thread
    sendMessageToOtherThread( message.getText() , message.getJMSMessageID() ); 
  }
  // TODO Implement criteria to exit the loop here
}
 

在另一个线程中,我将执行以下操作(成功处理之后):
这是在同时执行的独特JMS连接中.

public void AcknowledgeMessage(String messageId) {
  if (this.first) {
    this.connection.start();
    this.session = this.connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
    this.queue = this.session.createQueue(this.QueueId);
  }
  QueueReceiver receiver = this.session.createReceiver(this.queue, "JMSMessageID='" + messageId + "'");
  Message AckMessage = receiver.receive(2000);
  receiver.close();
}

似乎未找到该消息(超时后, AckMessage为空),但是该消息确实存在于队列中. 我怀疑该消息被连续输入线程阻止..确实,当单独触发AcknowledgeMes​​sage()时,它可以正常工作.

是否有一种更清洁的方式来检索1条消息?根据其QueueId和messageId
另外,如果连续阅读器必须长时间记住消息或ID,我觉得连续阅读器中可能会出现内存泄漏的风险.

如果我正在使用QueueBrowser以避免影响确认线程,则看来我无法获得此连续的输入提要..对吗?

更多上下文:我正在使用 ActiveMQ ,这2个线程是2个.
注意:简化了代码示例以关注该问题.

好吧,您不能两次阅读该消息,因为您已经在第一个线程中阅读了该消息.

ActiveMQ不会删除该消息,因为您尚未确认它,但是直到您断开JMS连接(我不确定ActiveMQ中是否还有很长的超时时间)之后,该消息才可见.

因此,您将必须使用原始消息并执行:message.acknowledge();. 但是请注意,会话不是线程安全的,因此如果在两个不同的线程中执行此操作,请务必小心.

I've written a Continuous JMS Message reveiver : Here, I'm using CLIENT_ACKNOWLEDGE because I don't want this thread to acknowledge the messages.

(...)
connection.start();
session = connection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(QueueId);
receiver = session.createReceiver(queue);
While (true) {
  message = receiver.receive(1000);
  if ( message != null ) {
    // NB : I can only pass Strings to the other thread
    sendMessageToOtherThread( message.getText() , message.getJMSMessageID() ); 
  }
  // TODO Implement criteria to exit the loop here
}

In another thread, I'll do something as follows (after successful processing) :
This is in a distinct JMS Connection executed simultaneously.

public void AcknowledgeMessage(String messageId) {
  if (this.first) {
    this.connection.start();
    this.session = this.connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
    this.queue = this.session.createQueue(this.QueueId);
  }
  QueueReceiver receiver = this.session.createReceiver(this.queue, "JMSMessageID='" + messageId + "'");
  Message AckMessage = receiver.receive(2000);
  receiver.close();
}

It appears that the message is not found (AckMessage is null after timeout) whereas it does exist in the Queue. I suspect the message to be blocked by the continuous input thread.. indeed, when firing the AcknowledgeMessage() alone, it works fine.

Is there a cleaner way to retrieve 1 message ? based on its QueueId and messageId
Also, I feel like there could be a risk of memory leak in the continuous reader if it has to memorize the Messages or IDs during a long time.. justified ?

If I'm using a QueueBrowser to avoid impacting the Acknowledge Thread, it looks like I cannot have this continuous input feed.. right ?

More context : I'm using ActiveMQ and the 2 threads are 2 custom "Steps" of a Pentaho Kettle transformation.
NB : Code samples are simplified to focus on the issue.

解决方案

Well, you can't read that message twice, since you have already read it in the first thread.

ActiveMQ will not delete the message as you have not acknowledge it, but it won't be visible until you drop the JMS connection (I'm not sure if there is a long timeout here as well in ActiveMQ).

So you will have to use the original message and do: message.acknowledge();. Note, however, that sessions are not thread safe, so be careful if you do this in two different threads.

这篇关于如何连续读取一个线程中的JMS消息并根据另一个线程中的JMSMessageID对其进行了解?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆