JMS 2.0-如何从共享使用者那里接收来自主题的消息? [英] JMS 2.0 - How to receive messages from topic with shared consumers?

查看:144
本文介绍了JMS 2.0-如何从共享使用者那里接收来自主题的消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用ActiveMQ Artemis和JMS 2.0来与共享使用者一起阅读主题消息.我有两个问题:

I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:

  1. 有什么方法可以使用xml格式的配置.
  2. 当我在使用者上设置消息侦听器时,是否必须使用while循环?如果我不使用while (true)循环,则当主题没有消息时,程序将终止.
  1. Is there any way to use configuration with xml format.
  2. When I set the message listener on the consumer is it mandatory to use a while loop? If I don't use while (true) loop the program will terminate when topic has no messages.

SharedConsumer.java

public class SharedConsumer {
    @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
    ConnectionFactory connectionFactory;

    public String maxConnectionForJSON;

    public void readFromTopicAndSendToQueue()throws Exception{
        Context initialContext = null;
        JMSContext jmsContext = null;
        int maxConnectionCount = 0;

        maxConnectionForJSON = "30";

        if (!StringUtils.isBlank(maxConnectionForJSON)){
            try{
                maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
            }catch (Exception e){
                //logging
            }
        }
        if (maxConnectionCount != 0) {
            try {
                List<JMSConsumer> jmsConsumerList = new ArrayList<>();
                initialContext = new InitialContext();

                Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");

                ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

                jmsContext = cf.createContext("admin", "admin");

                for (int i = 0; i < maxConnectionCount; i++){
                    JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
                    MessageListener listener = new Listener();
                    jmsConsumer.setMessageListener(listener);

                }
                while (true) {
                    Thread.sleep(30000);
                }
            } catch (Exception e) {
                System.err.println(e.getMessage());
            } finally {
                 if (initialContext != null) {
                     initialContext.close();
                 }
                 if (jmsContext != null) {
                     jmsContext.close();
                 }
            }
        }
    }

    public static void main(final String[] args) throws Exception {
        SharedConsumer sharedConsumer = new SharedConsumer();
        sharedConsumer.readFromTopicAndSendToQueue();
    }
}

SharedConsumerListener.java

public class Listener implements MessageListener {
    public static int count = 0;

    @Override
    public void onMessage(Message message) {
        System.out.println(message.toString() + "\ncount :" + count);
        count++;
    }

}

我可以使用xml文件读取JMS 1.1(ActiveMQ)中的Queue.我以为我们可以使用下面的JMS 2.0 Artemis中的配置文件,但是我错了.非常感谢您对Justin Bertram的帮助.

I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.

在JMS 1.1配置文件中

<bean id="brokerUrl" class="java.lang.String">
   <constructor-arg value="#{appProperties.queueUrl}"/>
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="maxConnections" value="#{appProperties.maxConnections}"/>
   <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
   <property name="maximumActiveSessionPerConnection" value = "10"/> 

</bean>

<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory1"/>
</bean>

<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
   <constructor-arg value="#{appProperties.queueName}"/>
</bean>

<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
               queue-capacity="0" rejection-policy="CALLER_RUNS"/>

<int:channel id="jmsInChannelForJSON" >
    <int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
                                        concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />

<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />

推荐答案

总之,是的,一旦设置了JMS使用者的消息侦听器,阻止程序终止是正常的.

In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.

创建JMS使用者并设置其消息侦听器时,JMS客户端实现将在后台创建新线程,以从创建使用者并设置侦听器的线程异步侦听消息.因此,创建使用者并设置侦听器的线程将继续进行.在您的情况下,您需要以某种方式停止线程退出并终止应用程序,因此您需要while循环.

When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.

这篇关于JMS 2.0-如何从共享使用者那里接收来自主题的消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆