在ActiveMQ中配置网络代理后,并非所有使用者都使用该消息 [英] Message is not consumed by all consumers when network brokers is configured in ActiveMQ

查看:96
本文介绍了在ActiveMQ中配置网络代理后,并非所有使用者都使用该消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在同一台计算机上有2个应用程序实例(尽管它也可以在不同的计算机上),并且具有两个具有不同端口的Tomcat实例,并且该应用程序中嵌入了Apache ActiveMQ.

我已经配置了一个静态的代理网络,以便来自一个实例的消息也可以被所有其他实例使用(每个实例可以是生产者和使用者).

servlet:

package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

/**
 * Servlet implementation class ActiveMQStartUpServlet
 */
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    @Override
    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        super.init(config);
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();
        mqStartup.startBrokerService();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println(req.getParameter("distributedMsg"));
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
            publisher.publish(mqConfig);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            releasePublisher(publisher);
        }
    }

    @SuppressWarnings("unchecked")
    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        @SuppressWarnings("rawtypes")
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {
            publishers.addLast(poolablePublisher);
        }

    }

    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
            else    
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
                 }
             }
         }
        return publishers;
    }

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
            }
        }
        return null;
    }




}

配置:

package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
                    .lookup(topicFactoryConName);
            this.topicConnection = factory.createTopicConnection();
            this.topicConnection.start();
        } catch (Exception e) {
            System.out.println("error: " + e);
        }
    }

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");
        }

        MQConfiguration config = null;

        if (config != null) {
            return config;
        }
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            }
            configurations.put(mqConfig, config);
        }

        return config;
    }

    public String getMqConfig() {
        return this.mqConfig;
    }

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        }
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
    }

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
            e.getMessage();
        }
        return null;
    }
}

发布者:

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        }
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);
    }

    @Override
    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));
    }

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;
    }

    public String getConfigurationName() {
        return this.configurationName;
    }
}

消费者:

package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    @Override
    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it

    }

}

JNDI:activemq-jndi.properties

# JNDI properties file to setup the JNDI server within ActiveMQ

#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)

#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099

#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic

distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory

distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic

ActiveMQStartup :

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        context.loadJndiProperties();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
    }

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            broker.setBrokerName(dataDirectory);
            broker.setUseShutdownHook(true);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
                networkConnector.setName(dataDirectory);
                mgContext.setConnectorPort(Integer.parseInt(jmxPort));
                broker.setManagementContext(mgContext);
                configureNetworkConnector(networkConnector);
            }
            broker.setNetworkConnectorStartAsync(true);
            broker.addConnector(connector);
            broker.start();
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);
        }
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
        networkConnector.setConsumerTTL(consumerTTL);
        //networkConnector.setStaticBridge(true);
    }

    // Stop broker service
    public void stopBrokerService() {
        try {
            broker.stop();
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);
        }
    }
}

我正在逐个启动tomcat实例,并发现代理之间的网络连接已建立.

当我第一次从instance1或instance2发送消息时,它仅在该实例上使用,但是当我从第二个实例发送消息时,两者都使用了该消息.

git中的代码: https://github.com/AratRana/ApacheActiveMQ

你能指出我哪里错了吗?

解决方案

最后,我能够做到.在服务器启动期间启动使用者时,我可以在所有实例中看到消息使用者.因此,要实现这一点,需要在发布任何消息之前启动使用者.

I have 2 instances of my application on the same machine (although it could be on different machines as well) with two Tomcat instances with different ports and Apache ActiveMQ is embedded in the application.

I have configured a static network of brokers so that the message from one instance can be consumed by all other instance as well (each instance can be producer and consumer).

servlet:

package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

/**
 * Servlet implementation class ActiveMQStartUpServlet
 */
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    @Override
    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        super.init(config);
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();
        mqStartup.startBrokerService();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println(req.getParameter("distributedMsg"));
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
            publisher.publish(mqConfig);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            releasePublisher(publisher);
        }
    }

    @SuppressWarnings("unchecked")
    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        @SuppressWarnings("rawtypes")
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {
            publishers.addLast(poolablePublisher);
        }

    }

    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
            else    
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
                 }
             }
         }
        return publishers;
    }

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
            }
        }
        return null;
    }




}

Configuration:

package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
                    .lookup(topicFactoryConName);
            this.topicConnection = factory.createTopicConnection();
            this.topicConnection.start();
        } catch (Exception e) {
            System.out.println("error: " + e);
        }
    }

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");
        }

        MQConfiguration config = null;

        if (config != null) {
            return config;
        }
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            }
            configurations.put(mqConfig, config);
        }

        return config;
    }

    public String getMqConfig() {
        return this.mqConfig;
    }

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        }
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
    }

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
            e.getMessage();
        }
        return null;
    }
}

publisher:

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        }
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);
    }

    @Override
    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));
    }

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;
    }

    public String getConfigurationName() {
        return this.configurationName;
    }
}

Consumer:

package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    @Override
    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it

    }

}

JNDI:activemq-jndi.properties

# JNDI properties file to setup the JNDI server within ActiveMQ

#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)

#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099

#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic

distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory

distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic

ActiveMQStartup:

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        context.loadJndiProperties();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
    }

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            broker.setBrokerName(dataDirectory);
            broker.setUseShutdownHook(true);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
                networkConnector.setName(dataDirectory);
                mgContext.setConnectorPort(Integer.parseInt(jmxPort));
                broker.setManagementContext(mgContext);
                configureNetworkConnector(networkConnector);
            }
            broker.setNetworkConnectorStartAsync(true);
            broker.addConnector(connector);
            broker.start();
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);
        }
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
        networkConnector.setConsumerTTL(consumerTTL);
        //networkConnector.setStaticBridge(true);
    }

    // Stop broker service
    public void stopBrokerService() {
        try {
            broker.stop();
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);
        }
    }
}

I am starting the tomcat instance one by one and seeing the network connection between the broker is getting established.

When I am sending messge from instance1 or instance2(first time) it is consuming on that instance only, but when I am sending message from the second instance it is consumed by both;

Code in git: https://github.com/AratRana/ApacheActiveMQ

Could you point me where I am wrong?

解决方案

Finally, I am able to do it. When I started the consumer during server startup then I am able to see the message consumer in all instances. So to achieve this the consumers needs to be started before publishing any message.

这篇关于在ActiveMQ中配置网络代理后,并非所有使用者都使用该消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆