无法让ActiveMQ重新发送我的消息 [英] Can't get ActiveMQ to resend my messages
问题描述
我有一个用Java编写的单线程ActiveMQ使用者。我要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它。如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送。
I have a single threaded ActiveMQ consumer written in Java. All I'm trying to do is receive() a messsage from the queue, attempt to send it to a web service, and if it succeeds acknowledge() it. If the web service call fails, I want the message to stay on the queue and be resent after some timeout.
除了重发部分之外,它或多或少都在工作:每次我重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会重新发送。
It's more or less working, except for the resending part: each time I restart my consumer, it gets one message for each that's still on the queue, but after failing to send them, the messages are never resent.
我的代码看起来喜欢:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
我目前没有使用交易(也许我应该?)。
I'm not currently using transactions (maybe I should be?).
我确定我错过了一些简单的东西,很快就会打我的额头,但我似乎无法弄清楚这是怎么回事。谢谢!
I'm sure I'm missing something easy and will be slapping my forehead soon but I can't seem to figure out how this is supposed to work. Thanks!
编辑:我自己也不能回答这个问题:
Can't answer this myself as not enough rep:
好的,经过一些实验,事实证明交易是实现这一目标的唯一方法。这是新代码:
OK, after some more experimentation, it turns out transactions are the only way to do this. Here is the new code:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
现在,消息正在消息按照重新交付政策中的规定每隔1000毫秒重新发送一次。
Now, the message is being resent every 1000ms as specified in the Redelivery Policy.
希望这有助于其他人! :)
Hope this helps someone else! :)
推荐答案
您不必使用事务,CLIENT_ACK / Session.recover()也可以...
You don't have to use transactions, CLIENT_ACK/Session.recover() will work as well...
当发生以下任何情况时,邮件会重新传送到客户端:
Messages are redelivered to a client when any of the following occurs:
- 使用交易会话并调用rollback()。
- 交易会话已关闭在调用commit之前。
- 会话正在使用CLIENT_ACKNOWLEDGE并调用Session.recover()。
请参阅 http:// activemq .apache.org / message-redelivery-and-dlq-handling.html
这篇关于无法让ActiveMQ重新发送我的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!