Apache ActiveMq Artemis 客户端重新连接到集群 HA 复制/共享数据存储中的下一个可用代理 [英] Apache ActiveMq Artemis client reconnecting to next available broker in clustered HA replication/shared-data store

查看:39
本文介绍了Apache ActiveMq Artemis 客户端重新连接到集群 HA 复制/共享数据存储中的下一个可用代理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Broker.xml (host1) 和 host2 只是端口号更改为 61616 和 slave 作为配置.参考 Apache Artemis 客户端故障转移发现

Broker.xml (host1) and host2 just the port number changes to 61616 and slave as configuration. In reference with Apache Artemis client fail over discovery

<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
   <core xmlns="urn:activemq:core">

      <bindings-directory>./data/bindings</bindings-directory>

      <journal-directory>./data/journal</journal-directory>

      <large-messages-directory>./data/largemessages</large-messages-directory>

      <paging-directory>./data/paging</paging-directory>
      <!-- Connectors -->

      <connectors>
         <connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
         <connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
      </connectors>

      <!-- Acceptors -->
      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
      </acceptors>

      <ha-policy>
       <replication>
          <master/>
       </replication>
    </ha-policy>

      <cluster-connections>
         <cluster-connection name="myhost1-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
              <static-connectors>
             <connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
            </static-connectors>
         </cluster-connection>
      </cluster-connections>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
             <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-created-queues>false</auto-delete-created-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
        </address-setting>
      </address-settings>
   </core>
</configuration>

使用Java和生产者模板将消息推送到直接端点然后路由到队列的客户端产生消息:

Client which uses Java and producer template to push message to direct endpoint and then routed to queue produces message:

Java-camel-producer-client

    package com.demo.artemis;

    import org.apache.camel.CamelContext;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.spring.SpringCamelContext;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;

    public class ToRoute {

         public static void main(String[] args) throws Exception { 
                ApplicationContext appContext = new ClassPathXmlApplicationContext(
                        "camel-context-producer.xml");
                ProducerTemplate template = null;
                CamelContext camelContext = SpringCamelContext.springCamelContext(
                        appContext, false);
                try {            
                    camelContext.start();
                     template = camelContext.createProducerTemplate();
                    String msg = null;
                    int loop =0;
                    int i = 0;
                    while(true) {
                        if (i%10000==0) {
                            i=1;
                            loop=loop+1;
                        }
                        if(loop==2) break;
                        msg ="---> "+(++i);             
                        template.sendBody("direct:toDWQueue", msg);
                    }
                } finally {
                    if (template != null) {
                        template.stop();
                    }
                    camelContext.stop();
                }
         }
    }

向队列发送消息的骆驼上下文:说 camel-producer-client

Camel Context which sends message to the Queue: Say camel-producer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">
     <bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
      <constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
    </bean>

    <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
      <property name="maxConnections" value="10" />
      <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
      <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
      <property name="concurrentConsumers" value="5" />
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
      <property name="configuration" ref="jmsConfig" />
      <property name="streamMessageTypeEnabled" value="true"/>
    </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="myqueue" uri="jms:queue:myExampleQueue" />

      <route>
        <from uri="direct:toMyExample"/>
       <transform>
      <simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:myqueue"/>
      </route>
    </camelContext>

  </beans>

从 MyExampleQueue 读取并转换为 OutboundQueue 的 Camel 消费者:(此上下文文件使用骆驼org.apache.camel.spring.Main –ac"启动camel-consumer-client

Camel consumer which reads from MyExampleQueue and transforms to OutboundQueue: (This context file is started using camel "org.apache.camel.spring.Main –ac camel-consumer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">

        <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
            <property name="environment">
                <props>
                    <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
                                <prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
                                  <prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
                    <prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
                </props>
            </property>
        </bean>
           <bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
            <property name="jndiName" value="ConnectionFactory"/> 
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>     
        </bean> 

        <bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>
    <!-- dynamic destination if the destination name  is not found in JNDI -->
             <property name="fallbackToDynamicDestination" value="true"/>
        </bean>

       <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="jndiFactoryBean"/>
            <property name="destinationResolver" ref="jndiDestinationResolver"/>
                 <property name="concurrentConsumers" value="10" />
        </bean>

        <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
               <property name="configuration" ref="jmsConfig" />
        </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
      <endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />

      <route>
        <from uri="ref:inqueue" />
        <convertBodyTo type="java.lang.String" />
        <transform>
          <simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:outqueue" />
      </route>
    </camelContext>
  </beans>

我认为我面临的问题与这张票相似http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

I think the issue i am facing is similar to this ticket http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

当使用 Camel 上下文时,我看到客户端消费者在主服务器不可用时被重定向到从服务器(我使用的是带有 JMSPoolConnectionFactory 的 JNDI).

When using the Camel context, i see the client consumer is redirected to slave when the master is not available (i was using the JNDI with JMSPoolConnectionFactory).

Java-producer-client 使用生产者模板将消息发送到直接端点,该端点被转换并路由到队列.当主服务器关闭时,客户端无法进行故障转移并连接到从服务器.(问题:这是代理处于 HA 模式时预期的行为).

The Java-producer-client sends the message using the producer template to direct endpoint which is transformed and routed to queue. When the master is down, the client was not able to fail-over and connect to slave. (Question: Is this the behavior expected when the broker are in HA mode).

尝试重新连接后,当主服务器关闭时,我看到以下异常.

After reconnect attempt i see below exception When the master is down.

Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response

另一方面,使用 Main 启动的 camel-context-consumer 能够自动故障转移到从属设备.在控制台中,我确实注意到消费者计数 10 在从主机和处理数据中可用.但是当主节点备份时,客户端并没有切换到活跃的主节点.问题:这也是预期的吗?

on the other hand camel-context-consumer which was started using Main, was able to fail-over to slave automatically. In the console i did notice that the consumer count 10 was available in slave host and processing data. But when the master is back up, the client didn't switch over to the live Master node. Question :Is this also expected?

使用以下约定创建连接工厂.

For creating the connectionfactory below conventions is used.

(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;

问题:即使使用静态连接器,我们是否需要配置广播和发现组?

Question: Do we need to configure the broacast and discovery group, even when using a static connector?

推荐答案

如果在客户端处于阻塞调用的中间(例如,发送持久消息并等待来自代理的确认、提交事务等).这在 中进一步讨论文档.

The error stating "Unblocking a blocking call that will never get a response" is expected if failover happens when the client is in the middle of a blocking call (e.g. sending a durable message and waiting for an ack from the broker, committing a transaction, etc.). This is discussed further in the documentation.

考虑到您的配置,客户端在返回时不会切换回主代理这一事实也是预料之中的.简而言之,您没有正确配置故障恢复.你的主人应该有:

The fact that clients don't switch back to the master broker when it comes back is also expected given your configuration. In short, you haven't configured failback properly. Your master should have:

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

你的奴隶应该有:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

这也在 文档.

最后,使用静态连接器时无需配置广播和发现组.

Lastly, you do not need to configure the broadcast and discovery groups when using a static connector.

这篇关于Apache ActiveMq Artemis 客户端重新连接到集群 HA 复制/共享数据存储中的下一个可用代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆