Kafka Streams 在生成主题时不会将偏移量增加 1 [英] Kafka Streams does not increment offset by 1 when producing to topic

查看:31
本文介绍了Kafka Streams 在生成主题时不会将偏移量增加 1的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经实现了一个简单的 Kafka 死信记录处理器.

当使用控制台生产者产生的记录时,它工作得很好.

但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1.

死信处理器背景:

我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续向下流.当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理.

死信处理器:

  • 在运行应用程序开始时记录每个分区的结束偏移
  • 结束偏移标志着停止处理给定死信主题的记录的点,以避免在重新处理的记录返回死信主题时出现无限循环.
  • 应用程序通过消费者组从上次运行产生的最后一个偏移量恢复.
  • 应用程序正在使用事务和 KafkaProducer#sendOffsetsToTransaction 来提交最后产生的偏移量.

为了跟踪我的范围内的所有记录何时被处理为一个主题的分区,我的服务将其从生产者的最后产生的偏移量与消费者保存的结束偏移量映射进行比较.当我们到达结束偏移量时,消费者通过 KafkaConsumer#pause 暂停该分区,当所有分区都暂停时(意味着它们到达保存的结束偏移量)然后调用它退出.

我认为这可能是 Kafka 配置问题,例如 max.message.bytes 但没有一个真正有意义.然后我想也许是因为加入,但没有看到任何会改变制片人运作方式的方式.

不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...

无论生产方法如何,偏移量是否应该始终增加 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?

我是否完全遗漏了某些东西?

解决方案

消息偏移量增加 1 并不是官方 API 约定,即使 JavaDocs 指出了这一点(似乎 JavaDocs 应该更新).

  • 如果你不使用事务,你要么得到至少一次语义,要么没有保证(有些人称之为至少一次语义).对于至少一次,记录可能会被写入两次,因此,两条连续消息的偏移量并没有真正增加 1,因为重复写入消耗"了两个偏移量.

  • 如果您使用事务,则事务的每次提交(或中止)都会将提交(或中止)标记写入主题——这些事务标记也消耗"一个偏移量(这是您观察到的).

因此,通常您不应该依赖连续的偏移量.您得到的唯一保证是,每个偏移量在一个分区内都是唯一的.

I have implemented a simple Kafka Dead letter record processor.

It works perfectly when using records produced from the Console producer.

However I find that our Kafka Streams applications do not guarantee that producing records to the sink topics that the offsets will be incremented by 1 for each record produced.

Dead Letter Processor Background:

I have a scenario where records may be received before all data required to process it is published. When records are not matched for processing by the streams app they are move to a Dead letter topic instead of continue to flow down stream. When new data is published we dump the latest messages from the Dead letter topic back in to the stream application's source topic for reprocessing with the new data.

The Dead Letter processor:

  • At the start of the run application records the ending offsets of each partition
  • The ending offsets marks the point to stop processing records for a given Dead Letter topic to avoid infinite loop if reprocessed records return to Dead Letter topic.
  • Application resumes from the last Offsets produced by the previous run via consumer groups.
  • Application is using transactions and KafkaProducer#sendOffsetsToTransaction to commit the last produced offsets.

To track when all records in my range are processed for a topic's partition my service compares its last produced offset from the producer to the the consumers saved map of ending offsets. When we reach the ending offset the consumer pauses that partition via KafkaConsumer#pause and when all partitions are paused (meaning they reached the saved Ending offset)then calls it exits.

The Kafka Consumer API States:

Offsets and Consumer Position Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.

The Kafka Producer API references the next offset is always +1 as well.

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

But you can clearly see in my debugger that the records consumed for a single partition are anything but incremented 1 at a time...

I thought maybe this was a Kafka configuration issue such as max.message.bytes but none really made sense. Then I thought perhaps it is from joining but didn't see any way that would change the way the producer would function.

Not sure if it is relevant or not but all of our Kafka applications are using Avro and Schema Registry...

Should the offsets always increment by 1 regardless of method of producing or is it possible that using Kafka streams API does not offer the same guarantees as the normal Producer Consumer clients?

Is there just something entirely that I am missing?

解决方案

It is not an official API contract that message offsets are increased by one, even if the JavaDocs indicate this (it seems that the JavaDocs should be updated).

  • If you don't use transactions, you get either at-least-once semantics or no guarantees (some call this at-most-once semantics). For at-least-once, records might be written twice and thus, offsets for two consecutive messages are not really increased by one as the duplicate write "consumes" two offsets.

  • If you use transactions, each commit (or abort) of a transaction writes a commit (or abort) marker into the topic -- those transactional markers also "consume" one offset (this is what you observe).

Thus, in general you should not rely on consecutive offsets. The only guarantee you get is, that each offset is unique within a partition.

这篇关于Kafka Streams 在生成主题时不会将偏移量增加 1的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆