Kafka 消费者偏移量提交检查以避免提交较小的偏移量 [英] Kafka Consumer offset commit check to avoid committing smaller offsets

查看:39
本文介绍了Kafka 消费者偏移量提交检查以避免提交较小的偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们假设我们有一个消费者发送一个提交偏移量 10 的请求.如果存在通信问题并且代理没有收到请求,当然也没有响应.之后,我们让另一个消费者处理另一批并成功提交偏移量 20.

We assume that we have a consumer that sends a request to commit offset 10. If there is a communication problem and the broker didn't get the request and of course didn't respond. After that we have another consumer process another batch and successfully committed offset 20.

问:我想知道是否有一种方法或属性可以处理,以便在我们提交偏移量 20 之前,我们可以检查日志中的前一个偏移量是否已提交?

Q: I want to know if there is a way or property to handle so we can check if the previous offset in the log are committed or not before committing in our case the offset 20?

推荐答案

你所描述的场景只有在使用异步提交时才会发生.

The scenario you are describing can only happen when using asynchronous commits.

请记住,一个特定的 TopicPartition 只能由同一 ConsumerGroup 中的单个消费者使用.如果您有两个消费者读取相同的 TopicPartition,则只能

Keep in mind that one particular TopicPartition can only be consumed by a single consumer within the same ConsumerGroup. If you have two consumers reading the same TopicPartition it is only possible

  1. 如果他们有不同的消费者组,或者
  2. 如果它们具有相同的 ConsumerGroup 并且发生重新平衡.但是,一次只有一个使用者会读取该 TopicPartition,而不是同时读取.

案例#1 非常清楚:如果他们有不同的消费者组,他们会并行且独立地使用分区.它们的承诺偏移量也是单独管理的.

Case #1 is pretty clear: If they have different ConsumerGroups they consume the partition in parallel and independently. Also their comitted offsets are managed separately.

案例#2:如果第一个消费者由于消费者失败/死亡并且没有恢复消费者而未能提交偏移量 10,则将发生消费者重新平衡,另一个活动消费者将拿起该分区.由于未提交偏移量 10,新消费者将在跳转到下一批之前再次开始读取偏移量 10 并可能提交偏移量 20.这导致至少一次".语义,并可能导致重复.

Case #2: If the first consumer fails to commit offset 10 because the consumer failed/died and is not recovering a consumer Rebalance will happen and another active consumer will pick up that partition. As the offset 10 was not committed, the new consumer will start reading again offset 10 before jumping to the next batch and possibly commit offset 20. This leads to "at-least-once" semantics and could lead to duplicates.

现在,来到唯一一种情况,您可以在提交更高的偏移量后提交更小的偏移量.正如开头所说,如果您异步提交偏移量(使用commitAsync),这确实可能发生.想象以下场景,按时间排序:

Now, coming to the only scenario where you could commit a smaller offset after committing a higher offset. As said in the beginning, this could indeed happen if you asynchronously commit the offsets (using commitAsync). Imagine the following scenario, ordered by time:

  • 消费者读取偏移量 0(后台线程尝试提交偏移量 0)
  • 提交偏移量 0 成功
  • 消费者读取偏移量 1(后台线程尝试提交偏移量 1)
  • 提交偏移量 1 失败,请稍后再试
  • 消费者读取偏移量 2(后台线程尝试提交偏移量 2)
  • 提交偏移量 2 成功
  • 现在,该怎么办(重新尝试提交偏移量 1?)
  • Consumer reads offset 0 (background thread tries to commit offset 0)
  • committing offset 0 succeeded
  • Consumer reads offset 1 (background thread tries to commit offset 1)
  • committing offset 1 failed, try again later
  • Consumer reads offset 2 (background thread tries to commit offset 2)
  • committing offset 2 succeeded
  • Now, what to to with (re-trying committing offset 1?)

如果你让重试机制再次提交偏移量1,看起来你的消费者只提交到偏移量1.这是因为每个消费者组在最新偏移量上的信息TopicPartition存储在内部compacted Kafka 主题 __consumer_offsets 旨在仅存储我们消费者组的最新值(在我们的例子中:偏移量 1).

If you let the retrying mechanism to commit offset 1 again, it looks like your consumer has only committed up until the offset 1. This is because the information for each Consumer group on the latest offset par TopicPartition is stored in the internal compacted Kafka topic __consumer_offsets which is meant to store only the latest value (in our case: offset 1) for our Consumer Group.

在Kafka - The Definitive Guide"一书中,有一个关于如何缓解这个问题的提示:

In the book "Kafka - The Definitive Guide", there is a hint on how to mitigate this problem:

重试异步提交:为异步重试获取正确提交顺序的一个简单模式是使用单调递增的序列号.每次提交时增加序列号,并将提交时的序列号添加到 commitAsync 回调中.当你准备发送重试时,检查回调得到的提交序列号是否等于实例变量;如果是,则没有更新的提交,重试是安全的.如果实例序列号更高,则不要重试,因为已经发送了较新的提交.

Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.

举个例子,你可以在下面的 Scala 中看到这个想法的实现:

As an example, you can see an implementation of this idea in Scala below:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}

这篇关于Kafka 消费者偏移量提交检查以避免提交较小的偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆