JMS - 从一个消费者到多个消费者 [英] JMS - Going from one to multiple consumers

查看:156
本文介绍了JMS - 从一个消费者到多个消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个JMS客户端正在生成消息并通过JMS队列发送给其唯一的消费者。

I have a JMS client which is producing messages and sending over a JMS queue to its unique consumer.

我想要的是不止一个消费者获取这些消息。我想到的第一件事就是将队列转换为主题,因此当前和新的消费者可以订阅并将相同的消息传递给所有消费者。

What I want is more than one consumer getting those messages. The first thing that comes to my mind is converting the queue to a topic, so current and new consumers can subscribe and get the same message delivered to all of them.

这显然将涉及修改生产者和消费者方面的当前客户代码。

This will obviously involve modifying the current clients code in both producer and consumer side of things.

我还想看看其他选项,比如创建第二个队列,这样我不必修改现有的消费者。我相信这种方法有一些优点(如果我错了,请纠正我)平衡两个不同队列之间的负载而不是一个,这可能会对性能产生积极影响。

I would like to also look at other options like creating a second queue, so that I don't have to modify the existing consumer. I believe there are advantages in this approach like (correct me if I am wrong) balancing the load between two different queues rather than one, which might have a positive impact on performance.

我想就你可能会看到的这些选项和缺点/专业人士提出建议。任何反馈都非常感谢。

I would like to get advise on these options and cons / pros that you might see. Any feedback is highly appreciated.

推荐答案

您有几个选项,如您所述。

You have a few options as you stated.

如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。如果您的消费者不活着,那么队列提供的一件事就是持久性。这将取决于您正在使用的MQ系统。

If you convert it to a topic to get the same effect you will need to make the consumers persistent consumers. One thing the queue offers is persistence if your consumer isn't alive. This will depend on the MQ system you are using.

如果您想坚持使用队列,您将为每个消费者创建一个队列,并为将侦听该消息的调度员创建一个队列。原始队列。

If you want to stick with queues, you will create a queue for each consumer and a dispatcher that will listen on the original queue.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

主题优点


  • 更容易动态添加新消费者。所有消费者都会在没有任何工作的情况下收到新消息。

  • 您可以创建循环主题,以便Consumer_1将收到消息,然后消费者_2,然后消费者_3

  • 消费者可以推送新消息,而不必查询队列,使其成为被动的。

主题的缺点


  • 除非您的代理支持此配置,否则消息不会持久。如果消费者下线并返回,除非设置了持久性消费者,否则可能会丢失消息。

  • 很难让Consumer_1和Consumer_2接收消息而不是Consumer_3。使用Dispatcher和Queues,Dispatcher无法在Consumer_3的队列中放置消息。

队列优点


  • 消息持续到消费者删除消息

  • 调度员可以过滤哪些消费者获取哪些消息通过不将消息放入相应的消费者队列中。这可以通过过滤主题来完成。

队列缺点


  • 需要创建其他队列以支持多个消费者。在动态环境中这不会有效。

在开发消息系统时,我更喜欢主题,因为它给了我最大的力量,但是,当您已经在使用队列时,它会要求您更改系统的工作方式以实现主题。

When developing a Messaging System I prefer topics as it gives me the most power, but seeing as you are already using Queues it would require you to change how your system works to implement Topics instead.

设计和实施包含多个消费者的队列系统

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

来源

请记住,您需要处理其他事项,例如问题异常处理,重新连接到连接和队列,如果您丢失连接等等。这只是为了给您一个关于如何完成我所描述的内容的想法。

Keep in mind there are other things you'll need to take care of such as problem exception handling, reconnection to the connection and queues if you lose your connection, etc. This is just designed to give you an idea of how to accomplish what I described.

在实际系统中,我可能不会在第一个异常时退出。我会允许系统继续尽可能地运行并记录错误。正如在此代码中所示,如果在单个使用者队列中放入消息失败,则整个调度程序将停止。

In a real system I probably wouldn't exit out at the first exception. I would allow the system to continue operating the best it could and log errors. As it stands in this code if putting a message in a single consumers queue fails, the whole dispatcher will stop.

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

这篇关于JMS - 从一个消费者到多个消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆