如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移 [英] How to use ABSwitchCluster to failover between Kafka clusters

查看:40
本文介绍了如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,它使用 Kafka 在实例之间同步数据,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换并流式传输到另一个主题供客户端使用.

I have an application that is using Kafka to Synchronize data between instances, therefore it both produces and consumes data from Kafka, additionally the application is consuming a Kafka Topic and transforming and streaming that data into another topic for clients to consume.

我的应用程序有两个用于故障转移的集群.通过 Kafka 文档,我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 讨论了 ABCSwitchCluster.

My Application has two Clusters for failover. Going through the Kafka Documentation I found this https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting that talks about ABSwitchCluster.

如何使用 KafkaTemplate.send() 和 @KafkaListener 注释方法,rel="nofollow noreferrer">ABSwitchCluster 是否在 Kafka 集群出现故障时自动进行故障转移?

How can I use ABSwitchCluster to Failover Automagically if the Kafka Cluster goes down, for both KafkaTemplate.send() and @KafkaListener annotated methods?

更新更多信息

我为 KafkaTemplate.send 和 Kafka Consumer Events NonResponsiveConsumerEventListenerContainerIdleEvent

I've added some error Handlers for KafkaTemplate.send and Kafka Consumer Events NonResponsiveConsumerEvent and ListenerContainerIdleEvent

最终他们调用了一个共享的Method来切换,一个BeanPostProcessor用于实际将ABSwitchCluster添加到KafkaResourceFactoryBeans中.

Ultimately they call a shared Method to switch, and a BeanPostProcessor is used to actually add the ABSwitchCluster to KafkaResourceFactory Beans.

切换代码如下所示:

   @Autowired
   KafkaSwitchCluster kafkaSwitchCluster;

   @Autowired
   WebApplicationContext context;

   @Autowired
   KafkaListenerEndpointRegistry registry;

   /**
    *  Unable to use {@link Autowired} due to circular dependency
    *  with {@link KafkaPostProcessor}
    *  @return
    */
   public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
   { return context.getBean(DefaultKafkaProducerFactory.class); }

   /** Back-End Method to Actually Switch between the clusters */
   private void switchCluster()
   {
      if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
      else { kafkaSwitchCluster.primary(); }

      getDefaultKafkaProducerFactory().reset();

      registry.stop();
      registry.destroy();
      registry.start();

      for(MessageListenerContainer listener : registry.getListenerContainers() )
      {
         listener.stop();
         listener.start();
      }
   }

在查看测试日志时,鉴于上面的更新,看起来生产者是正确的,切换集群,但我的消费者不是.

Given the Updates Above when Looking in the Test Logs, it appears that the Producer is correctly, switching clusters, but my consumers are not.

那么我怎样才能让 @KafkaListener 消费者切换?

So how can I get the @KafkaListener consumers to switch?

推荐答案

默认的生产者和消费者工厂以及 KafkaAdminKafkaResourceFactory 的子类.

The default producer and consumer factories as well as the KafkaAdmin are suclasses of KafkaResourceFactory.

您通过调用 setBootstrapServersSupplier()ABSwitchCluster 传入.

You pass the ABSwitchCluster in by calling setBootstrapServersSupplier().

ABSwitchCluster 不会自动故障转移.

您需要自己的代码来执行故障转移,然后在生产者工厂上调用 reset() 并停止/启动所有侦听器容器 (KafkaListenerEndpointRegistry.stop()/start() 适用于所有 @KafkaListener.

You need your own code to perform the failover and then call reset() on the producer factory and stop/start all the listener containers (KafkaListenerEndpointRegistry.stop()/start() for all @KafkaListeners.

这篇关于如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆