Kafka消费者健康检查 [英] Kafka consumer health check
问题描述
有没有一种简单的方法可以判断消费者(使用 spring boot 和 @KafkaListener 创建)是否正常运行?这包括 - 可以访问和轮询代理,至少分配了一个分区等.
我看到有多种方法可以订阅不同的生命周期事件,但这似乎是一个非常脆弱的解决方案.
提前致谢!
您可以使用 AdminClient
获取当前组的状态...
@SpringBootApplication公共类 So56134056Application {公共静态无效主(字符串 [] args){SpringApplication.run(So56134056Application.class, args);}@豆公共新话题话题(){return new NewTopic("so56134056", 1, (short) 1);}@KafkaListener(id = "so56134056", 主题 = "so56134056")公共无效听(字符串输入){System.out.println(in);}@豆公共 ApplicationRunner 运行程序(KafkaAdmin 管理员){返回参数 ->{尝试 (AdminClient 客户端 = AdminClient.create(admin.getConfig())) {而(真){Map地图 =client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);System.out.println(地图);System.in.read();}}};}}
<块引用>
{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8,clientId,hostId.0/10.17assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}
我们一直在考虑将 getLastPollTime()
暴露给侦听器容器 API.
getAssignedPartitions()
从 2.1.3 开始可用.
Is there a simple way to say if a consumer (created with spring boot and @KafkaListener) is operating normally? This includes - can access and poll a broker, has at least one partition assigned, etc.
I see there are ways to subscribe to different lifecycle events but this seems to be a very fragile solution.
Thanks in advance!
You can use the AdminClient
to get the current group status...
@SpringBootApplication
public class So56134056Application {
public static void main(String[] args) {
SpringApplication.run(So56134056Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56134056", 1, (short) 1);
}
@KafkaListener(id = "so56134056", topics = "so56134056")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfig())) {
while (true) {
Map<String, ConsumerGroupDescription> map =
client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
System.out.println(map);
System.in.read();
}
}
};
}
}
{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}
We have been thinking about exposing getLastPollTime()
to the listener container API.
getAssignedPartitions()
has been available since 2.1.3.
这篇关于Kafka消费者健康检查的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!