Kafka-为什么在将AUTO_OFFSET_RESET_CONFIG设置为"latest"时,新鲜的groupId不会返回主题中的所有消息? [英] Kafka - Why fresh groupId doesn't return all messages in topic when setting AUTO_OFFSET_RESET_CONFIG as "latest"

查看:948
本文介绍了Kafka-为什么在将AUTO_OFFSET_RESET_CONFIG设置为"latest"时,新鲜的groupId不会返回主题中的所有消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在scala中实现一个非常简单的Kafka(0.9.0.1)使用者(下面的代码).

I try to implement a very simple Kafka (0.9.0.1) consumer in scala (code below).

据我所知,Kafka(或者更好的说是Zookeeper)为每个groupId存储了给定主题上次使用的消息的偏移量.因此,考虑以下情况:

For my understanding, Kafka (or better say the Zookeeper) stores for each groupId the offset of the last consumed message for a giving topic. So given the following scenario:

  1. 具有 groupId1 的消费者,昨天仅消费了5主题中的邮件.现在,上一次使用的邮件的偏移量为4(考虑偏移量为0的第一条消息)
  2. 晚上,有2条新消息到达该主题
  3. 今天,我使用相同的 groupId1 重新启动使用者,是两个选择:
  1. Consumer with groupId1 which Yesterday consumed the only 5 messages in a topic. Now last consumed message has offset 4 (considering the first message with offset 0)
  2. During the night 2 new messages arrive to the topic
  3. Today I restart the consumer, with the same groupId1, there will be two options:

选项1:如果将以下属性设置为"latest" ,则消费者将读取夜间收到的最后2条新消息:

Option 1: The consumer will read the last 2 new messages which arrived during the night if I set the following property as "latest":

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

选项2:如果将以下属性设置为最早" ,则消费者将阅读该主题中的所有7条消息:

Option 2: The consumer will read all the 7 messages in the topic if I set the following property as "earliest":

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

问题:由于某种原因,如果我将使用者的groupId更改为 groupId2 ,则该数字是给定主题的新groupId,因此它永远不会消耗任何消息之前,其最新偏移量应为0.我期望通过设置

Problem: For some reason, if I change the groupId of the consumer to groupId2, that is a new groupId for the given topic, so it never consumed any message before and its latest offset should be 0. I was expecting that by setting

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

使用者将在第一次执行时阅读存储在主题中的所有消息(相当于最早的消息).然后,对于后续执行,它将仅消耗新的执行.但是,这不会发生.

The consumer will read during the first execution all the messages stored in the topic (the equivalent of having earliest). And then for following executions it will consume just the new ones. However this is not what happens.

如果我设置新的 groupId 并将 AUTO_OFFSET_RESET_CONFIG 保留为 latest ,则使用者无法读取任何消息.然后,我需要做的是将第一个运行集 AUTO_OFFSET_RESET_CONFIG 设置为最早的 ,并且一旦groupID已经有一个不同于0的偏移量,我就可以移至最新.

If I set a new groupId and keep AUTO_OFFSET_RESET_CONFIG as latest, the consumer is not able to read any message. What I need to do then is for the first run set AUTO_OFFSET_RESET_CONFIG as earliest, and once there is already an offset different to 0 for the groupID I can move to latest.

这是我的消费者应该如何使用的吗?有比我第一次运行使用者时切换 AUTO_OFFSET_RESET_CONFIG 更好的解决方案吗?

Is this how it should be working my consumer? Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIGafter the first time I run the consumer?

下面是我作为简单使用者使用的代码:

Below is the code I am using as a simple consumer:

class KafkaTestings {

  val brokers = "listOfBrokers"
  val groupId = "anyGroupId"
  val topic = "anyTopic"

  val props = createConsumerConfig(brokers, groupId)

  def createConsumerConfig(brokers: String, groupId: String): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run() = {
    consumer.subscribe(Collections.singletonList(this.topic))

    Executors.newSingleThreadExecutor.execute(    new Runnable {
      override def run(): Unit = {

        while (true) {
          val records = consumer.poll(1000)

          for (record <- records) {
            println("Record: "+record.value)
          }

        }

      }
    })
  }
}

object ScalaConsumer extends App {
  val testConsumer = new KafkaTestings()
  testConsumer.run()
} 

用作编写此简单消费者的参考

This was used as a reference to write this simple consumer

推荐答案

这已在文档中说明.

如果启动一个新的使用者组(即,在Kafka中没有存储现有偏移量的组),则必须选择该使用者是否应从最早的消息开始(该主题中最旧的消息仍然可用)或从最新消息(仅是从现在开始产生的消息).

If you start a new consumer group (i.e. one for which there are no existing offsets stored in Kafka), you have to choose if the consumer should be starting from the EARLIEST possible messages (the oldest message still available in the topic) or from the LATEST (only messages that produced from now on).

是否有比第一次运行使用者时切换AUTO_OFFSET_RESET_CONFIG更好的解决方案?

Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIG after the first time I run the consumer?

您可以将其保持在最早的时间,因为第二次运行使用者时,它已经存储了偏移量,并且可以从那里开始.重置策略仅在创建新的使用者组时使用.

You can keep it at EARLIEST, because the second time you run the consumer, it will already have stored offsets and just pick up there. The reset policy is only used when a new consumer group is created.

今天,我使用相同的groupId1重新启动使用者,将有两个选择:

Today I restart the consumer, with the same groupId1, there will be two options:

并非如此.由于消费者组是在前一天运行的,因此它将找到其已承诺的抵消额,并从停下来的地方开始提取.因此,无论您将重置策略设置为什么,它都会收到这两条新消息.

Not really. Since the consumer group was running the day before, it will find its committed offsets and just pick up where it left off. So no matter what you set the reset policy to, it will get these two new messages.

尽管知道,Kafka不会永远存储这些偏移量,但我相信默认值只是一周.因此,如果您关闭客户的费用超过此金额,则补偿可能会过期,并且可能会意外重置为EARLIEST(对于大型主题而言可能会很昂贵).鉴于此,无论如何都要将其更改为最新可能是谨慎的做法.

By aware though, that Kafka does not store these offsets forever, I believe the default is just a week. So if you shut down your consumers for more than that, the offsets may be aged out, and you could run into an accidental reset to EARLIEST (which may be expensive for large topics). Given that, it is probably prudent to change it to LATEST anyway.

这篇关于Kafka-为什么在将AUTO_OFFSET_RESET_CONFIG设置为"latest"时,新鲜的groupId不会返回主题中的所有消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆