Consumer.endOffsets在Kafka中如何工作? [英] How does Consumer.endOffsets work in Kafka?

查看:657
本文介绍了Consumer.endOffsets在Kafka中如何工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个无限期运行的计时器任务,该任务遍历kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,已提交的偏移量和结束偏移量.与Kafka控制台使用者分组脚本的工作方式类似,但适用于所有分组.

Assume I've a timer task running indefinitely which iterates over the all the consumer groups in the kafka cluster and outputs lag, committed offset and end offset for all partitions for each group. Similar to how Kafka console consumer group script works except it's for all groups.

类似

单个使用者-不起作用-不为某些提供的主题分区返回偏移量(例如,提供了10个-返回了5个偏移量)

Single Consumer - Not Working - Doesn't return offsets for some of the provided topic partitions ( ex. 10 provided - 5 Offsets Returned )

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者-工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

版本:Kafka-Client 2.0.0

Versions:Kafka-Client 2.0.0

我是否正确使用了消费者api?理想情况下,我想使用单个消费者.

Am I using the consumer api incorrectly ? Ideally I would like to use single consumer.

如果您需要更多详细信息,请告诉我.

Let me know if you need more details.

推荐答案

这是Fetcher.fetchOffsetsByTimes()中的一个错误,特别是在groupListOffsetRequests方法内部,该逻辑中逻辑未添加要重试的分区,而请求分区偏移量的领导者是未知或不可用.

It's a bug in Fetcher.fetchOffsetsByTimes() specifically inside groupListOffsetRequests method in which the logic was not adding the partitions for retry where leader for requesting offset for a partition was unknown or unavailable.

当您在所有使用者组分区上使用单个使用者时,这一点尤其明显,在某些消费者组中,当我们请求endoffsets时某些组已经具有主题分区领导者信息,而对于没有领导者信息未知或不可用的主题分区,则是由于该错误而退出.

This was more noticeable when you use the single consumer across all consumer group partitions where some groups already have the topics partition leader information when we requested endoffsets and for the topics partitions where there is no leader information is unknown or unavailable are left off because of the bug.

后来,我意识到从每个消费者组中提取主题分区不是一个好主意,而是进行了更改,以从AdminClient.listTopics & AdminClient.describeTopics中读取主题分区,然后一次将所有内容传递给Consumer.endOffsets.

Later, I realized it was not a good idea to pull topics partitions from each consumer group instead made the change to read the topics partitions from AdminClient.listTopics & AdminClient.describeTopics and pass all at once to Consumer.endOffsets.

尽管这不能完全解决问题,因为在多次运行之间主题/分区可能仍然不可用或未知.

Although this completely doesn't resolve the issue as topics/partitions may still be unavailable or unknown between multiple runs.

可以找到更多信息- KAFKA-7044 & pull request .此问题已修复,计划在2.1.0版本中发布.

More information can be found - KAFKA-7044 & pull request. This has been fixed and scheduled for 2.1.0 release.

这篇关于Consumer.endOffsets在Kafka中如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆