如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移 [英] How to use ABSwitchCluster to failover between Kafka clusters
问题描述
我有一个应用程序,它使用 Kafka 在实例之间同步数据,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换并流式传输到另一个主题供客户端使用.
I have an application that is using Kafka to Synchronize data between instances, therefore it both produces and consumes data from Kafka, additionally the application is consuming a Kafka Topic and transforming and streaming that data into another topic for clients to consume.
我的应用程序有两个用于故障转移的集群.通过 Kafka 文档,我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 讨论了 ABCSwitchCluster
.
My Application has two Clusters for failover. Going through the Kafka Documentation I found this
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting that talks about ABSwitchCluster
.
如何使用 KafkaTemplate.send() 和 @KafkaListener
注释方法,rel="nofollow noreferrer">ABSwitchCluster 是否在 Kafka 集群出现故障时自动进行故障转移?
How can I use ABSwitchCluster to Failover Automagically if the Kafka Cluster goes down, for both KafkaTemplate.send()
and @KafkaListener
annotated methods?
更新更多信息
我为 KafkaTemplate.send 和 Kafka Consumer Events NonResponsiveConsumerEvent
和 ListenerContainerIdleEvent
I've added some error Handlers for KafkaTemplate.send and Kafka Consumer Events NonResponsiveConsumerEvent
and ListenerContainerIdleEvent
最终他们调用了一个共享的Method来切换,一个BeanPostProcessor
用于实际将ABSwitchCluster
添加到KafkaResourceFactory
Beans中.
Ultimately they call a shared Method to switch, and a BeanPostProcessor
is used to actually add the ABSwitchCluster
to KafkaResourceFactory
Beans.
切换代码如下所示:
@Autowired
KafkaSwitchCluster kafkaSwitchCluster;
@Autowired
WebApplicationContext context;
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* Unable to use {@link Autowired} due to circular dependency
* with {@link KafkaPostProcessor}
* @return
*/
public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
{ return context.getBean(DefaultKafkaProducerFactory.class); }
/** Back-End Method to Actually Switch between the clusters */
private void switchCluster()
{
if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
else { kafkaSwitchCluster.primary(); }
getDefaultKafkaProducerFactory().reset();
registry.stop();
registry.destroy();
registry.start();
for(MessageListenerContainer listener : registry.getListenerContainers() )
{
listener.stop();
listener.start();
}
}
在查看测试日志时,鉴于上面的更新,看起来生产者是正确的,切换集群,但我的消费者不是.
Given the Updates Above when Looking in the Test Logs, it appears that the Producer is correctly, switching clusters, but my consumers are not.
那么我怎样才能让 @KafkaListener
消费者切换?
So how can I get the @KafkaListener
consumers to switch?
推荐答案
默认的生产者和消费者工厂以及 KafkaAdmin
是 KafkaResourceFactory
的子类.
The default producer and consumer factories as well as the KafkaAdmin
are suclasses of KafkaResourceFactory
.
您通过调用 setBootstrapServersSupplier()
将 ABSwitchCluster
传入.
You pass the ABSwitchCluster
in by calling setBootstrapServersSupplier()
.
ABSwitchCluster
不会自动故障转移.
您需要自己的代码来执行故障转移,然后在生产者工厂上调用 reset()
并停止/启动所有侦听器容器 (KafkaListenerEndpointRegistry.stop()/start()
适用于所有 @KafkaListener
.
You need your own code to perform the failover and then call reset()
on the producer factory and stop/start all the listener containers (KafkaListenerEndpointRegistry.stop()/start()
for all @KafkaListener
s.
这篇关于如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!