KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量 [英] KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset

查看:34
本文介绍了KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processorprocess() 方法中的代码:

I'm trying to achieve a transaction in a Kafka Processor to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer method sendOffsetsToTransaction which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process() method of my Processor:

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")

不幸的是,使用此代码(显然在每次执行时都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A).

Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.

我设法使它工作,将 +1 添加到 this.context().offset() 返回的偏移量并重新定义 val offsetAndMetadata 这样:

I manage to make it works adding a +1 to the offset returned by this.context().offset() and redefining the val offsetAndMetadata in this way:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)

这是正常行为还是我做错了什么?

Is this the normal behaviour or I'm doing something wrong?

谢谢:)

推荐答案

您的代码是正确的.

您提交的偏移量是您接下来要读取的消息的偏移量(而不是您上次读取的消息的偏移量).

The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).

比较:https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processorTask/internals/Streams/processor/internals/Streams/

这篇关于KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆