无法从所有分区获得 Kafka 滞后 [英] Unable to get Kafka lag from all partitions

查看:29
本文介绍了无法从所有分区获得 Kafka 滞后的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法为分配给同一消费者组的所有消费者找到整个 kafka 滞后?

Is there a way to find whole kafka lag for all consumers assigned to same consumer group?

我只能得到分配分区的延迟.例如假设只有一个分区分配给消费者,下面的代码只会为该分区带来延迟.不适用于其他分区.

I could only get the lag for the assigned partition. For e.g. Assume only one partition is assigned to a consumer, below code only brings lag for that partition. Not for other partition.

Set<TopicPartition> partitionSet = consumer.assignment();
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitionSet);
for(TopicPartition tp : partitionSet) {
            LOG.info("Topic:{}, EndOffset:{}, currentOffset:{}, LAG:{}",
                    tp.topic(), endOffsets.get(tp), consumer.position(tp), endOffsets.get(tp)-consumer.position(tp));
        }

基本上,想找到所有分区的滞后总和,以了解某个主题的所有消费者(同一组)滞后了多少.

Basically, would like to find the sum of lags from all partitions to understand how much all consumers(same group) of a topic is lagging behind.

另外,是否有类似于 kafka-consumer-groups 的 api,并将 bootstrap-server 和 group 作为参数传递以查找滞后?

Also, is there any api available similar to kafka-consumer-groups, and pass bootstrap-server and group as arguments to find the lag?

./kafka-consumer-groups.sh --bootstrap-server --group --describe

./kafka-consumer-groups.sh --bootstrap-server --group --describe

推荐答案

以编程方式实现此目的的正确方法是使用 AdminClient API:

The correct way to achieve this programmatically is to use the AdminClient API:

  1. 使用 listConsumerGroupOffsets().

获取日志结束偏移量.目前你需要启动一个消费者并调用 endOffsets() 用于在步骤 1 中检索到的所有分区.

Get log end offsets. At the moment you need to start a Consumer and call endOffsets() for all partitions retrieved in step 1.

在 Kafka 2.5(预计将于 2020 年 2 月结束)中,有一个新的 AdminClient API 来检索日志结束偏移量 listOffsets(),它将允许仅使用 AdminClient 来检索延迟.

In Kafka 2.5 (expected end of February 2020), there's a new AdminClient API to retrieve log end offsets listOffsets() that will enable to solely use the AdminClient to retrieve lag.

对于每个分区,从日志结束偏移量(步骤 2)中减去提交的偏移量(步骤 1).

For each partition, substract the committed offset (step 1) from the log end offset (step 2).

这基本上就是 kafka-consumer-groups.sh 在幕后所做的.所以检查 这个的实现工具,如果你愿意.

This is basically what kafka-consumer-groups.sh does under the covers. So check the implementation of this tool if you want.

这篇关于无法从所有分区获得 Kafka 滞后的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆