在 ActiveMQ Artemis 中使用通配符队列时不消耗消息 [英] Messages not consumed when using wildcard queues in ActiveMQ Artemis

查看:43
本文介绍了在 ActiveMQ Artemis 中使用通配符队列时不消耗消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我在 ActiveMQ Artemis test.A 和通配符队列 test.# 上创建队列,那么我可以向 test.A 发送消息> 并且它也将被传送到 test.#.但是,当我使用来自 test.# 的消息时,我惊讶地发现该消息仍然存在于 test.A

If I create queues on ActiveMQ Artemis test.A and a wildcard queue test.# then I can send a message to test.A and it will also be delivered to test.#. However, I am surprised to learn that when I consume the message from test.# then the message is still present on test.A

如何更改我的代码或配置以获得预期的行为?

How can I change my code or configuration to get the expected behavior?

示例代码:

import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;

public class Main {
    static String AMQ = "vm://0";

    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ server = null;
        try {
            server = createEmbeddedBroker();

            var serverLocator = ActiveMQClient.createServerLocator(AMQ);
            var clientSessionFactory = serverLocator.createSessionFactory();

            createQueues(clientSessionFactory);

            // queues are empty on creation
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0);
                assertQueueLength(session, "test.A", 0);
            }
            
            sendMessage(clientSessionFactory, "test.A");

            // expect message is delivered to both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 1);
                assertQueueLength(session, "test.A", 1);
            }

            consumeMessage(clientSessionFactory, "test.#");

            // expect message is consumed from both
            try (var session = clientSessionFactory.createSession()) {
                assertQueueLength(session, "test.#", 0); // ok - message gone
                assertQueueLength(session, "test.A", 0); // fails! 
            }

        } finally {
            if (server != null) server.stop();
        }
    }

    private static EmbeddedActiveMQ createEmbeddedBroker() throws Exception {
        var config = new ConfigurationImpl();
        config.addAcceptorConfiguration("vm", AMQ);
        config.setSecurityEnabled(false);
        config.setPersistenceEnabled(false);

        var server = new EmbeddedActiveMQ();
        server.setConfiguration(config);
        server.start();

        return server;
    }

    private static void createQueues(ClientSessionFactory csf)  throws Exception {
        var session = csf.createSession();

        /*
        <address name="test.A">
            <anycast>
                <queue name="test.A" />
            </anycast>
        </address>
        */
        var testA = new QueueConfiguration("test.A")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.A");
        session.createQueue(testA);

        /*
        <address name="test.#">
            <anycast>
                <queue name="test.#" />
            </anycast>
        </address>
        */
        var testWildcard = new QueueConfiguration("test.#")
            .setRoutingType(RoutingType.ANYCAST)
            .setAddress("test.#");
        session.createQueue(testWildcard);

        // also tried to create address without a queue, but the message to test.A is not delivered to test.#
        // session.createAddress(new SimpleString("test.#"), RoutingType.ANYCAST, false);
    }

    private static void sendMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var producer = session.createProducer(queue);
        producer.send(session.createMessage(true));
        producer.close();
        session.close();
    }

    private static void consumeMessage(ClientSessionFactory csf, String queue) throws Exception {
        var session = csf.createSession();
        var consumer = session.createConsumer(queue);
        consumer.setMessageHandler(message -> {
            try {
                log("Consuming one message from " + queue);
                message.acknowledge();
                log("Consumed one message from " + queue);
            } catch (ActiveMQException e) {
                throw new IllegalStateException(e);
            }
        });
        session.start();

        Thread.sleep(1000); // hack to wait

        consumer.close();
        session.close();
    }

    private static void assertQueueLength(ClientSession session, String queue, long expected) throws Exception {
        long actual = session.queueQuery(SimpleString.toSimpleString(queue)).getMessageCount();
        if (actual != expected) {
            throw new IllegalStateException("Queue " + queue + " has " + actual + " messages. Expected " + expected);
        } else {
            log("Queue " + queue + " has " + actual + " messages as expected");
        }
    }

    private static void log(String msg) {
        System.out.println(">>> " + msg);
    }
}

依赖关系:

org.apache.activemq:artemis-core-client:2.17.0
org.apache.activemq:artemis-server:2.17.0

推荐答案

您所看到的预期的行为.这里要记住的关键是你正在利用 通配符路由而不是通配符消耗.使用通配符路由消息不仅会路由到显式发送消息的地址的队列,还会路由到匹配通配符地址的任何队列.消息路由到的每个队列都有自己的消息副本.

What you are seeing is the expected behavior. The key thing to keep in mind here is that you're leveraging wildcard routing and not wildcard consuming. With wildcard routing messages are not only routed to the queues of the address where the message is explicitly sent but also to any queues on matching wildcard addresses. Each queue to which the message is routed has its own copy of the message.

通配符路由是在考虑多播(即发布/订阅)用例(例如分层主题)的情况下实现的,但如果您想将其与任播一起使用,则有几个选项:

Wildcard routing was implemented with multicast (i.e. pub/sub) use-cases in mind (e.g. hierarchical topics), but there are a few options if you want to use it with anycast:

  • 按原样接受语义.
  • 创建地址test.A 没有队列,例如:
session.createAddress(SimpleString.toSimpleString("test.A"), RoutingType.ANYCAST, false);

这是一个完全有效的配置,但您将无法直接使用来自 test.A 的消息,因为不存在这样的队列.您只能使用通配符地址上的队列中的消息.

  • test.A 队列中将 purge-on-no-consumer 设置为 true,例如:

    This is a perfectly valid configuration, but you won't be able to consume messages from test.A directly since no such queue exists. You'd only be able to consume messages from the queue(s) on the wildcard address.

  • Set purge-on-no-consumer to true on the test.A queue, e.g.:

    var testA = new QueueConfiguration("test.A")
        .setRoutingType(RoutingType.ANYCAST)
        .setPurgeOnNoConsumers(true)
        .setAddress("test.A");
    

    此设置将允许队列在消费者连接时接收消息,但只要最后一个消费者断开连接,队列中的所有消息就会被清除,并且只要没有消费者,消息就不会路由到它.

  • This setting will allow the queue to receive messages while a consumer is connected, but as soon as the last consumer disconnects the queue will be purged of all messages and as long as there are no consumers messages will not be routed to it.

    这篇关于在 ActiveMQ Artemis 中使用通配符队列时不消耗消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆