Kafka - 将 AUTO_OFFSET_RESET_CONFIG 设置为“最新"时,为什么新鲜的 groupId 不返回主题中的所有消息? [英] Kafka - Why fresh groupId doesn't return all messages in topic when setting AUTO_OFFSET_RESET_CONFIG as "latest"

查看:158
本文介绍了Kafka - 将 AUTO_OFFSET_RESET_CONFIG 设置为“最新"时,为什么新鲜的 groupId 不返回主题中的所有消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在 Scala 中实现一个非常简单的 Kafka (0.9.0.1) 消费者(代码如下).

I try to implement a very simple Kafka (0.9.0.1) consumer in scala (code below).

据我所知,Kafka(或者更确切地说是 Zookeeper)为每个 groupId 存储给定主题的最后消费消息的偏移量.因此,鉴于以下情况:

For my understanding, Kafka (or better say the Zookeeper) stores for each groupId the offset of the last consumed message for a giving topic. So given the following scenario:

  1. 具有 groupId1 的消费者,昨天仅消耗了 5 个主题中的消息.现在最后消费的消息的偏移量为 4(考虑偏移量为 0 的第一条消息)
  2. 晚上有 2 条新消息到达该主题
  3. 今天我重新启动消费者,使用相同的groupId1,会有有两个选择:
  1. Consumer with groupId1 which Yesterday consumed the only 5 messages in a topic. Now last consumed message has offset 4 (considering the first message with offset 0)
  2. During the night 2 new messages arrive to the topic
  3. Today I restart the consumer, with the same groupId1, there will be two options:

选项 1:如果我将以下属性设置为 "latest",消费者将读取夜间到达的最后 2 条新消息:

Option 1: The consumer will read the last 2 new messages which arrived during the night if I set the following property as "latest":

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

选项 2:如果我将以下属性设置为 "earliest",消费者将读取主题中的所有 7 条消息:

Option 2: The consumer will read all the 7 messages in the topic if I set the following property as "earliest":

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

问题: 出于某种原因,如果我将消费者的 groupId 更改为 groupId2,这是给定主题的新 groupId,因此它从未消费过任何消息之前,它的最新偏移量应该是 0.我期待通过设置

Problem: For some reason, if I change the groupId of the consumer to groupId2, that is a new groupId for the given topic, so it never consumed any message before and its latest offset should be 0. I was expecting that by setting

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

消费者将在第一次执行期间读取存储在主题中的所有消息(相当于最早的).然后对于后续执行,它将只消耗新的.然而事实并非如此.

The consumer will read during the first execution all the messages stored in the topic (the equivalent of having earliest). And then for following executions it will consume just the new ones. However this is not what happens.

如果我设置一个新的 groupId 并将 AUTO_OFFSET_RESET_CONFIG 保持为 latest,消费者将无法读取任何消息.然后我需要做的是将第一次运行集 AUTO_OFFSET_RESET_CONFIG 设置为 earliest,一旦 groupID 的偏移量与 0 不同,我就可以移动到 最新的.

If I set a new groupId and keep AUTO_OFFSET_RESET_CONFIG as latest, the consumer is not able to read any message. What I need to do then is for the first run set AUTO_OFFSET_RESET_CONFIG as earliest, and once there is already an offset different to 0 for the groupID I can move to latest.

这应该是我的消费者的工作方式吗?有没有比在我第一次运行消费者后切换 AUTO_OFFSET_RESET_CONFIG 更好的解决方案?

Is this how it should be working my consumer? Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIGafter the first time I run the consumer?

以下是我作为简单消费者使用的代码:

Below is the code I am using as a simple consumer:

class KafkaTestings {

  val brokers = "listOfBrokers"
  val groupId = "anyGroupId"
  val topic = "anyTopic"

  val props = createConsumerConfig(brokers, groupId)

  def createConsumerConfig(brokers: String, groupId: String): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run() = {
    consumer.subscribe(Collections.singletonList(this.topic))

    Executors.newSingleThreadExecutor.execute(    new Runnable {
      override def run(): Unit = {

        while (true) {
          val records = consumer.poll(1000)

          for (record <- records) {
            println("Record: "+record.value)
          }

        }

      }
    })
  }
}

object ScalaConsumer extends App {
  val testConsumer = new KafkaTestings()
  testConsumer.run()
} 

这个被用作写这个简单消费者的参考>

This was used as a reference to write this simple consumer

推荐答案

这是按照文档工作的.

如果你开始一个新的消费者组(即一个没有存储在 Kafka 中的现有偏移量的消费者组),你必须选择消费者是否应该从最早的可能的消息(主题中仍然可用的最旧的消息)开始或来自最新的(仅从现在开始产生的消息).

If you start a new consumer group (i.e. one for which there are no existing offsets stored in Kafka), you have to choose if the consumer should be starting from the EARLIEST possible messages (the oldest message still available in the topic) or from the LATEST (only messages that produced from now on).

有没有比在我第一次运行消费者后切换 AUTO_OFFSET_RESET_CONFIG 更好的解决方案?

Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIG after the first time I run the consumer?

您可以将其保留在 EARLIEST,因为第二次运行消费者时,它已经存储了偏移量,然后从那里提取.重置策略仅在创建新消费者组时使用.

You can keep it at EARLIEST, because the second time you run the consumer, it will already have stored offsets and just pick up there. The reset policy is only used when a new consumer group is created.

今天重启consumer,同一个groupId1,会有两个选项:

Today I restart the consumer, with the same groupId1, there will be two options:

不是真的.由于消费者组在前一天运行,它将找到其提交的偏移量,然后从停止的地方继续.因此,无论您将重置策略设置为什么,它都会收到这两条新消息.

Not really. Since the consumer group was running the day before, it will find its committed offsets and just pick up where it left off. So no matter what you set the reset policy to, it will get these two new messages.

尽管知道 Kafka 不会永远存储这些偏移量,我相信默认值只是一周.因此,如果您关闭消费者的时间超过这个时间,则偏移量可能会过时,并且您可能会意外重置为 EARLIEST(这对于大型主题可能会很昂贵).鉴于此,无论如何将其更改为最新版本可能是明智的.

By aware though, that Kafka does not store these offsets forever, I believe the default is just a week. So if you shut down your consumers for more than that, the offsets may be aged out, and you could run into an accidental reset to EARLIEST (which may be expensive for large topics). Given that, it is probably prudent to change it to LATEST anyway.

这篇关于Kafka - 将 AUTO_OFFSET_RESET_CONFIG 设置为“最新"时,为什么新鲜的 groupId 不返回主题中的所有消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆