KafkaProducer sendOffsetsToTransaction需要offset + 1才能成功提交当前偏移量 [英] KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset
问题描述
我正在尝试在Kafka Processor
中实现交易,以确保不会重复处理同一则消息两次.给定一条消息(A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息(A).从文档中,我发现了Producer
方法sendOffsetsToTransaction
,该方法似乎只有在事务成功后才能在事务中提交偏移量.这是我的Processor
的process()
方法中的代码:
I'm trying to achieve a transaction in a Kafka Processor
to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer
method sendOffsetsToTransaction
which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process()
method of my Processor
:
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
不幸的是,使用此代码(每次执行显然失败),每次我在异常发生后重新启动应用程序时,都会重新处理已处理的消息(A).
Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.
我设法使它在this.context().offset()
返回的偏移量上添加+1
并以这种方式重新定义val offsetAndMetadata
:
I manage to make it works adding a +1
to the offset returned by this.context().offset()
and redefining the val offsetAndMetadata
in this way:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
这是正常行为还是我做错了什么?
Is this the normal behaviour or I'm doing something wrong?
谢谢:)
推荐答案
您的代码正确.
您提交的偏移量是您接下来要阅读的消息的偏移量(不是您上次阅读的消息的偏移量).
The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).
比较: 查看全文