Apache Kafka使用者组的偏移量如何过期? [英] How does an offset expire for an Apache Kafka consumer group?

查看:140
本文介绍了Apache Kafka使用者组的偏移量如何过期?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我注意到一些奇怪的行为时,我正在对一个旧主题进行一些测试.在阅读Kafka的日志时,我注意到此已删除8个过期偏移"消息:

I was making some tests on an old topic when I noticed some strange behaviours. Reading Kafka's log I noticed this "removed 8 expired offsets" message:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

实际上,我有两个问题:

In fact, I have 2 questions:

  1. 此抵消有效期如何用于消费者组?

  1. How does this offset expiration work for a consumer group?

这个过期的偏移量是否可以解释这种行为,即我的消费者在具有 auto.offset.reset = Latest 时不轮询任何内容,但是从具有 auto.offset.reset =最早吗?

Can this expired offset explain this behaviour where my consumer would not poll anything when it had auto.offset.reset = latest, but it polled from the last committed offset when it had auto.offset.reset = earliest ?

推荐答案

Kafka默认情况下会在可配置的时间段后删除已提交的偏移量.请参阅参数 offsets.retention.minutes .即,如果消费者组在此时间段内处于非活动状态(即不提交任何偏移量),则偏移量将被删除.因此,即使使用者正在运行,如果它未提交某些分区的偏移量,则这些偏移量也会受到 offset.retention.minutes .

Kafka, by default deletes committed offsets after a configurable period of time. See parameter offsets.retention.minutes. Ie, if a consumer group is inactive (ie, does not commit any offsets) for this amount of time, the offsets get deleted. Thus, even if the consumer is running, if it does not commit offsets for some partitions, those offsets are subject to offset.retention.minutes.

如果您启动消费者,则会发生以下情况:

If you start a consumer, the following happens:

  1. 查找(有效)已提交的偏移量(针对消费者组)
  1. look for a (valid) committed offset (for the consumer group)
  1. 如果找到有效的偏移量,则从此处恢复
  2. 如果未找到有效的偏移量,请根据 auto.offset.reset 参数重置偏移量
  1. if valid offset is found, resume from there
  2. if no valid offset is found, reset offset according to auto.offset.reset parameter

因此,如果您的偏移量被删除并且 auto.offset.reset = Latest ,则在将新数据添加到主题之前,消费者不会轮询任何内容.如果 auto.offset.reset =最早,它应该占用整个主题.

Thus, if your offsets got deleted and auto.offset.reset = latest, you consumer will not poll anything until new data is added to the topic. If auto.offset.reset = earliest it should consume the whole topic.

有关此 https://issues.apache.org/的讨论,请参见此JIRA.jira/browse/KAFKA-3806 https://issues.apache.org/jira/browse/KAFKA-4682

这篇关于Apache Kafka使用者组的偏移量如何过期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆