KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量 [英] KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset
问题描述
我正在尝试在 Kafka Processor
中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer
方法 sendOffsetsToTransaction
似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor
的 process()
方法中的代码:
I'm trying to achieve a transaction in a Kafka Processor
to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer
method sendOffsetsToTransaction
which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process()
method of my Processor
:
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
不幸的是,使用此代码(显然在每次执行时都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A).
Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.
我设法使它工作,将 +1
添加到 this.context().offset()
返回的偏移量并重新定义 val offsetAndMetadata
这样:
I manage to make it works adding a +1
to the offset returned by this.context().offset()
and redefining the val offsetAndMetadata
in this way:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
这是正常行为还是我做错了什么?
Is this the normal behaviour or I'm doing something wrong?
谢谢:)
推荐答案
您的代码是正确的.
您提交的偏移量是您接下来要读取的消息的偏移量(而不是您上次读取的消息的偏移量).
The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).
这篇关于KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!