Kafka消费者健康检查 [英] Kafka consumer health check

查看:188
本文介绍了Kafka消费者健康检查的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有一种简单的方法可以判断消费者(使用 spring boot 和 @KafkaListener 创建)是否正常运行?这包括 - 可以访问和轮询代理,至少分配了一个分区等.

我看到有多种方法可以订阅不同的生命周期事件,但这似乎是一个非常脆弱的解决方案.

提前致谢!

解决方案

您可以使用 AdminClient 获取当前组的状态...

@SpringBootApplication公共类 So56134056Application {公共静态无效主(字符串 [] args){SpringApplication.run(So56134056Application.class, args);}@豆公共新话题话题(){return new NewTopic("so56134056", 1, (short) 1);}@KafkaListener(id = "so56134056", 主题 = "so56134056")公共无效听(字符串输入){System.out.println(in);}@豆公共 ApplicationRunner 运行程序(KafkaAdmin 管理员){返回参数 ->{尝试 (AdminClient 客户端 = AdminClient.create(admin.getConfig())) {而(真){Map地图 =client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);System.out.println(地图);System.in.read();}}};}}

<块引用>

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8,clientId,hostId.0/10.17assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

我们一直在考虑将 getLastPollTime() 暴露给侦听器容器 API.

getAssignedPartitions() 从 2.1.3 开始可用.

Is there a simple way to say if a consumer (created with spring boot and @KafkaListener) is operating normally? This includes - can access and poll a broker, has at least one partition assigned, etc.

I see there are ways to subscribe to different lifecycle events but this seems to be a very fragile solution.

Thanks in advance!

解决方案

You can use the AdminClient to get the current group status...

@SpringBootApplication
public class So56134056Application {

    public static void main(String[] args) {
        SpringApplication.run(So56134056Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56134056", 1, (short) 1);
    }

    @KafkaListener(id = "so56134056", topics = "so56134056")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfig())) {
                while (true) {
                    Map<String, ConsumerGroupDescription> map =
                            client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
                    System.out.println(map);
                    System.in.read();
                }
            }
        };
    }

}

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

We have been thinking about exposing getLastPollTime() to the listener container API.

getAssignedPartitions() has been available since 2.1.3.

这篇关于Kafka消费者健康检查的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆