从JMS MessageListener发出回滚信号 [英] Signal a rollback from a JMS MessageListener

查看:305
本文介绍了从JMS MessageListener发出回滚信号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用JMS和ActiveMQ.一切都是奇迹.我不使用弹簧,也不能.

I've been working with JMS and ActiveMQ. Everything is working wonders. I am not using spring, nor can I.

接口javax.jms.MessageListener只有一种方法onMessage.在实现中,有可能会引发异常.如果实际上引发了异常,那么我说该消息未得到正确处理,需要重新尝试.因此,我需要ActiveMQ等待一会儿,然后重试.即,我需要抛出异常来回滚JMS事务.

The interface javax.jms.MessageListener has only one method, onMessage. From within a implementation, there is a chance an exception will be thrown. If in fact an exception gets thrown, then I say the message wasn't properly processed and needs to be re-tried. So, I need ActiveMQ to wait for a little while and then, retry. i.e. I need the thrown exception to rollback the JMS transaction.

我该如何完成这种行为?

How can I accomplish such a behaviour?

也许我找不到ActiveMQ中的某些配置.

Maybe there is some configuration in ActiveMQ I wasn't able to find.

或者...也许可以取消向消费者注册MessageListener并自己在诸如以下这样的循环中使用消息:

Or... maybe could do away with registering MessageListeners to consumers and consume the messages myself, in a a loop like:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

在几个线程中,而不是注册侦听器.

in a couple of threads, instead of registering the listener.

或者...我可以通过某种方式修饰/AOP/字节操作MessageListener来自己做.

Or... I could somehow decorate/AOP/byte-manipulate the MessageListeners to do this themselves.

您会选择哪种路线?为什么?

What route would you take and why?

注释:我无法完全控制MessageListener的代码.

note: I don't have full control over the MessageListeners code.

编辑 进行概念验证的测试:

EDIT A test for proof of concept:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());
}

推荐答案

如果要使用SESSION_TRANSACTED作为确认模式,则需要设置 ActiveMQ网站上的此页面还包含一些有关您的有用信息可能需要做.

If you want to use SESSION_TRANSACTED as your acknowledgement mode, then you need to setup a RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website also contains some good info for what you might need to do.

由于您不使用Spring,因此可以使用与以下代码类似的东西(来自上述链接之一)来设置RedeliveryPolicy:

Since you aren't using Spring, you can setup a RedeliveryPolicy with something similar to the following code (taken from one of the above links):

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);


修改 将您的代码段添加到答案中,下面显示了它如何与事务一起使用.尝试将此代码与注释掉的Session.rollback()方法一起使用,您会看到使用SESION_TRANSACTED和Session.commit/rollback可以按预期工作:


Edit Taking your code snippet added to the answer, the following shows how this works with transactions. Try this code with the Session.rollback() method commented out and you'll see that using SESION_TRANSACTED and Session.commit/rollback works as expected:

@Test
public void test() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());
}

}

这篇关于从JMS MessageListener发出回滚信号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆