ActiveMQ重新交付不起作用 [英] ActiveMQ redelivery does not work

查看:111
本文介绍了ActiveMQ重新交付不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用ActiveMQ实现死信队列。不幸的是,有关此方面的文档在某些方面还很模糊,我似乎无法正确设置所有内容。

I am trying the implement a dead letter queue using ActiveMQ. Unfortunately the documentation on this end is rather vague on some aspects and I can't seem to get everything properly set up.

我配置了以下Bean:

I have the following Beans configured:

@Bean
public JmsTemplate createJMSTemplate() {
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;
}

这是我的接收代码:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException {
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue '{}'", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();
}

@PreDestroy
private void destroy() throws JMSException {
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();
}

private class SegmentReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        logger.info("onMessage");
        try {
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) {
                throw new IOException("This segment is expected to fail");
            }
            System.out.println(segment.getText());
            message.acknowledge();
        }
        catch(IOException | JMSException exception) {
            logger.error(exception.toString());
            try {
                QueueReceiver.this.session.rollback();
            } catch (JMSException e) {
                logger.error(e.toString());
            }
            throw new RuntimeException(exception);
        }
    }

}

但是,什么都没发生。我正在使用默认配置使用现成的Apache ActiveMQ 5.14.2安装程序。我在这里想念什么?

However, nothing happens. I am using an out-of-the-box Apache ActiveMQ 5.14.2 setup using the default configuration. What am I missing here?

推荐答案

因为您正在使用
this.session = connection.createSession(false,Session .CLIENT_ACKNOWLEDGE);

调用 message.acknowledge(); 与调用 session相同.acknowledge();

because you are using this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
calling message.acknowledge(); is the same than calling session.acknowledge(); .

要使ActiveMQ重新交付成功与您的配置一起使用,有一些可能的变化很小:

to have ActiveMQ redelivery working successfully with your config, there is some possibilities with minimal changes:


  1. 调用 QueueReceiver.this.session.recover();

    在调用位置 QueueReceiver.this.session.rollback();

  1. calling QueueReceiver.this.session.recover();
    in place of calling QueueReceiver.this.session.rollback();




void org.apache.activemq.ActiveMQSession.recover()引发JMSException

void org.apache.activemq.ActiveMQSession.recover() throws JMSException

在此会话中停止消息传递,然后重新启动消息传递
带有最早的未确认消息。

Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.

所有使用者都按顺序发送邮件。确认收到的
消息会自动确认已将
传递给客户端的所有消息。

All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all messages that have been delivered to the client.

重新启动会话将使其执行以下操作:•停止
消息传递•标记所有可能已传递但未确认的消息
表示为已重新发送•重新开始发送顺序
,包括之前已发送过的所有未确认消息。重新传递的消息不必完全按照其原始传递顺序
传递。

Restarting a session causes it to take the following actions: •Stop message delivery •Mark all messages that might have been delivered but not acknowledged as "redelivered" •Restart the delivery sequence including all unacknowledged messages that had been previously delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.




  1. 使用
    this.session = connection.createSession(false,org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
    然后调用
    ((org.apache.activemq.command.ActiveMQMessage)message).acknowledge(); ,请注意,不调用此方法就像回滚,表示消息未被确认,并且在 onMessage 方法中引发异常将调用 QueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback()

  1. use this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); and call ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge(); , note that not calling this method is like a rollback, means the message is not acknowledged and throwing an exception in onMessage method will call QueueReceiver.this.consumer.rollback(); of org.apache.activemq.ActiveMQMessageConsumer.rollback().

只需调用 QueueReceiver.this .consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback()代替调用 QueueReceiver.this.session.rollback( );

这篇关于ActiveMQ重新交付不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆