如何使用 2.x 版本的 KafkaAdminClient (Java) 获取 Kafka 主题的最新偏移量/大小 [英] How to get latest offset/size of a Kafka topic using KafkaAdminClient (Java) for 2.x version

查看:270
本文介绍了如何使用 2.x 版本的 KafkaAdminClient (Java) 获取 Kafka 主题的最新偏移量/大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有更有效/更简单的方法来使用最新的 Java 中的 Kafka 客户端 2.4 API 获取主题/分区的大小/最新偏移量?然后,通过比较该组的偏移量与主题的大小来计算该组的 Lag...

Is there a more efficient/simpler way of getting the size / latest offsets of a topic/partitions using the newest Kafka client 2.4 APIs in Java? And then, calculate a Lag for a consumer group by comparing that group's offsets with the size of the topic...

我知道这个问题是针对较旧的 Kafka 版本提出的,还有一种方法可以从 Kafka 公开的 JMX 指标中获取此信息,但我坚持使用需要在 Java 中执行但使用最新 2.4 的遗留应用程序卡夫卡库.

I know this question has been asked for older Kafka versions and there is also a way to get this info from JMX metrics exposed by Kafka, but I am stuck with a legacy app that needs to do it in Java but with latest 2.4 Kafka libs.

获取此信息的常用方法,据我所知是:

The usual way of getting this info , as far as I understand is:

  • 最简单的部分:使用 KafkaAdminClient 上的 API 调用获取消费者 groupID 的主题/分区的偏移量,例如public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)
  • 最难的部分:确定每个分区的主题大小:
    • 创建一个新的消费者并订阅主题
    • 使用 consumer.seekToEnd(...)
    • 将消费者推进到最新的偏移量
    • 使用 consumer.position(...)
    • 获取所有分区的消费者位置
    • The easiest part: get offsets for a topic/partitions for a consumer groupID using an API call on KafkaAdminClient like public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)
    • The hardest part: Determine the size of the topic for each partition:
      • create a new consumer and subscribe to the topic
      • advance the consumer to the latest offset using consumer.seekToEnd(...)
      • get the position of the consumer for all partitions using consumer.position(...)

      因此,确定最后一个偏移量是一项非常繁重的操作......所以我的问题是:是否有一种更有效的方法可以在不使用虚拟消费者的情况下获取主题的最后偏移量,也许在最新的 2.4 API 中?主题/分区大小信息确实独立于任何消费者,因此在不使用消费者的情况下能够获得它似乎是合乎逻辑的......

      Thus, determining the last offset is a pretty heavy operation ... So my question is: is there a more efficient way of getting the last offsets for a topic without using the dummy consumer, maybe in the latest 2.4 APIs? The topic/partition size info is really independent of any consumers, so it seems logical to be able to get it without the use of consumers...

      谢谢!

      玛丽娜

      推荐答案

      在 kafka 消费应用程序的外部您是正确的,您的选择是查看分区结束偏移量与消费者组的最新检查点位置(假设消费者在问题甚至使用 kafka 来存储偏移量).

      externally to the kafka consuming application you are correct, your options are to look at partition end offsets vs the latest checkpointed positions of the consumer group (assuming the consumers in question even use kafka to store offsets).

      有一些工具可以为您监控这一点,例如 burrow.

      there are tools that will monitor this for you, such as burrow.

      但是,如果您可以访问消费应用程序本身,则有一种更准确的方法.这是所有消费者传感器的列表(默认情况下通过 API 或 jmx 公开)https://kafka.apache.org/documentation/#consumer_fetch_monitoring.

      However, if you have access to the consuming application itself there is a more accurate way. here's a list of all consumer sensors (exposed either via API or jmx by default) https://kafka.apache.org/documentation/#consumer_fetch_monitoring.

      有一个每个分区的 records-lag 指标.每次调用 poll() 时都会更新它,因此比提交的偏移量更准确且延迟更低.唯一的复杂之处是您需要对分配给消费者的所有分区的这些传感器的值求和.

      there is a per-partition records-lag metric. its updated every time poll() is called so is more accurate and lower latency than committed offsets. the only complication is you'd need to sum the values of these sensors across all partitions the consumer is assigned.

      这里是如何通过 KafkaConsumer.metrics():

      private long calcTotalLag(Map<MetricName, ? extends Metric> metrics) {
         long totalLag = 0;
         for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
           MetricName metricName = entry.getKey();
           Metric metric = entry.getValue();
           Map<String, String> tags = metricName.tags();
           if (metricName.name().equals("records-lag") && tags.containsKey("partition")) {
              totalLag += ((Number) metric.metricValue()).longValue();
           }
         }
      
         return totalLag;
      }
      

      这篇关于如何使用 2.x 版本的 KafkaAdminClient (Java) 获取 Kafka 主题的最新偏移量/大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆