Apache ActiveMq Artemis客户端重新连接到群集HA复制/共享数据存储中的下一个可用代理 [英] Apache ActiveMq Artemis client reconnecting to next available broker in clustered HA replication/shared-data store

查看:196
本文介绍了Apache ActiveMq Artemis客户端重新连接到群集HA复制/共享数据存储中的下一个可用代理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Broker.xml(主机1)和主机2只是端口号更改为61616,而从机则作为配置.参考 Apache Artemis客户端故障转移发现

Broker.xml (host1) and host2 just the port number changes to 61616 and slave as configuration. In reference with Apache Artemis client fail over discovery

<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
   <core xmlns="urn:activemq:core">

      <bindings-directory>./data/bindings</bindings-directory>

      <journal-directory>./data/journal</journal-directory>

      <large-messages-directory>./data/largemessages</large-messages-directory>

      <paging-directory>./data/paging</paging-directory>
      <!-- Connectors -->

      <connectors>
         <connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
         <connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
      </connectors>

      <!-- Acceptors -->
      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
      </acceptors>

      <ha-policy>
       <replication>
          <master/>
       </replication>
    </ha-policy>

      <cluster-connections>
         <cluster-connection name="myhost1-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
              <static-connectors>
             <connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
            </static-connectors>
         </cluster-connection>
      </cluster-connections>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
             <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-created-queues>false</auto-delete-created-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
        </address-setting>
      </address-settings>
   </core>
</configuration>

使用Java和生产者模板将消息推送到直接终结点然后路由到队列的客户端会产生消息:

Client which uses Java and producer template to push message to direct endpoint and then routed to queue produces message:

Java-camel-producer-client

    package com.demo.artemis;

    import org.apache.camel.CamelContext;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.spring.SpringCamelContext;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;

    public class ToRoute {

         public static void main(String[] args) throws Exception { 
                ApplicationContext appContext = new ClassPathXmlApplicationContext(
                        "camel-context-producer.xml");
                ProducerTemplate template = null;
                CamelContext camelContext = SpringCamelContext.springCamelContext(
                        appContext, false);
                try {            
                    camelContext.start();
                     template = camelContext.createProducerTemplate();
                    String msg = null;
                    int loop =0;
                    int i = 0;
                    while(true) {
                        if (i%10000==0) {
                            i=1;
                            loop=loop+1;
                        }
                        if(loop==2) break;
                        msg ="---> "+(++i);             
                        template.sendBody("direct:toDWQueue", msg);
                    }
                } finally {
                    if (template != null) {
                        template.stop();
                    }
                    camelContext.stop();
                }
         }
    }

将消息发送到队列的骆驼上下文:说 camel-producer-client

Camel Context which sends message to the Queue: Say camel-producer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">
     <bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
      <constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
    </bean>

    <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
      <property name="maxConnections" value="10" />
      <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
      <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
      <property name="concurrentConsumers" value="5" />
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
      <property name="configuration" ref="jmsConfig" />
      <property name="streamMessageTypeEnabled" value="true"/>
    </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="myqueue" uri="jms:queue:myExampleQueue" />

      <route>
        <from uri="direct:toMyExample"/>
       <transform>
      <simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:myqueue"/>
      </route>
    </camelContext>

  </beans>

从MyExampleQueue读取并转换为OutboundQueue的骆驼使用者:(此上下文文件使用骆驼"org.apache.camel.spring.Main –ac camel-consumer-client

Camel consumer which reads from MyExampleQueue and transforms to OutboundQueue: (This context file is started using camel "org.apache.camel.spring.Main –ac camel-consumer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">

        <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
            <property name="environment">
                <props>
                    <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
                                <prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
                                  <prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
                    <prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
                </props>
            </property>
        </bean>
           <bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
            <property name="jndiName" value="ConnectionFactory"/> 
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>     
        </bean> 

        <bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>
    <!-- dynamic destination if the destination name  is not found in JNDI -->
             <property name="fallbackToDynamicDestination" value="true"/>
        </bean>

       <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="jndiFactoryBean"/>
            <property name="destinationResolver" ref="jndiDestinationResolver"/>
                 <property name="concurrentConsumers" value="10" />
        </bean>

        <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
               <property name="configuration" ref="jmsConfig" />
        </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
      <endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />

      <route>
        <from uri="ref:inqueue" />
        <convertBodyTo type="java.lang.String" />
        <transform>
          <simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:outqueue" />
      </route>
    </camelContext>
  </beans>

我认为我面临的问题与这张票相似 http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

I think the issue i am facing is similar to this ticket http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

使用Camel上下文时,我看到当主服务器不可用时,客户端使用者将重定向到从服务器(我在JNDI和JMSPoolConnectionFactory中使用).

When using the Camel context, i see the client consumer is redirected to slave when the master is not available (i was using the JNDI with JMSPoolConnectionFactory).

Java-producer-client 使用生产者模板将消息发送到直接端点,该端点已转换并路由到队列.当主服务器关闭时,客户端将无法故障转移并连接到从服务器.(问题:当代理处于HA模式时,这是预期的行为).

The Java-producer-client sends the message using the producer template to direct endpoint which is transformed and routed to queue. When the master is down, the client was not able to fail-over and connect to slave. (Question: Is this the behavior expected when the broker are in HA mode).

尝试重新连接后,当主机断开时,我看到以下异常.

After reconnect attempt i see below exception When the master is down.

Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response

另一方面,使用Main启动的 camel-context-consumer 能够自动故障转移到从站.在控制台中,我确实注意到,从属主机和处理数据中有消费者计数10.但是当主服务器备份时,客户端没有切换到活动的主节点.问题:这也是预期的吗?

on the other hand camel-context-consumer which was started using Main, was able to fail-over to slave automatically. In the console i did notice that the consumer count 10 was available in slave host and processing data. But when the master is back up, the client didn't switch over to the live Master node. Question :Is this also expected?

使用以下约定来创建连接工厂.

For creating the connectionfactory below conventions is used.

(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;

问题:即使使用静态连接器,我们是否仍需要配置广播和发现组?

Question: Do we need to configure the broacast and discovery group, even when using a static connector?

推荐答案

如果客户端进行故障转移时发生错误,则说明错误取消阻止永远不会得到响应的阻塞调用" .阻塞呼叫的中间(例如,发送持久消息并等待来自代理的确认,提交事务等).在文档.

The error stating "Unblocking a blocking call that will never get a response" is expected if failover happens when the client is in the middle of a blocking call (e.g. sending a durable message and waiting for an ack from the broker, committing a transaction, etc.). This is discussed further in the documentation.

考虑到您的配置,也希望客户端在返回时不会切换回主代理.简而言之,您尚未正确配置故障回复.您的主人应该具有:

The fact that clients don't switch back to the master broker when it comes back is also expected given your configuration. In short, you haven't configured failback properly. Your master should have:

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

您的奴隶应该有:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

最后,使用静态连接器时不需要配置广播和发现组.

Lastly, you do not need to configure the broadcast and discovery groups when using a static connector.

这篇关于Apache ActiveMq Artemis客户端重新连接到群集HA复制/共享数据存储中的下一个可用代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆