如果在读取消息过程中发生任何异常,如何管理弹簧批处理作业中使用的 KafkaItemReader 中的偏移量 [英] How to manage offsets in KafkaItemReader used in spring batch job in case of any exception occurs in mid of reading messages

查看:31
本文介绍了如果在读取消息过程中发生任何异常,如何管理弹簧批处理作业中使用的 KafkaItemReader 中的偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我第一次在开发基于 Kafka 的 Spring Boot 应用程序.我的要求是使用 spring 批处理创建一个包含所有记录的输出文件.我创建了一个 spring 批处理作业,其中集成了一个扩展 KafkaItemReader 的自定义类.我现在不想提交偏移量,因为我可能需要返回从已经消耗的偏移量中读取一些记录.我的消费者配置有这些属性;

I am working on a Kafka based spring boot application for the first time. My requirement was to create an output file with all the records using spring batch. I created a spring batch job where integrated with a customized class which extends KafkaItemReader. I don't want to commit the offsets for now as i might need to go back read some records from already consumed offsets. My consumer config has these properties;

enable.auto.commit: 假自动偏移重置:最新group.id:

enable.auto.commit: false auto-offset-reset: latest group.id:

有两种情况->1.一个快乐的路径,我可以从kafka主题中读取所有消息并转换它们,然后使用上述配置将它们写入输出文件.2. 我在阅读消息时遇到异常,我不确定在这种情况下如何管理偏移量.即使我回去休息偏移量,如何确保它是消息的正确偏移量.我不会在任何地方保留消息记录的有效负载,除非它转到 spring 批处理输出文件.

There are two scenarios-> 1. A happy path, where i can read all the messages from kafka topic and transform them and then write them to an output file using above configuration. 2. I am getting an exception while reading thru' the messages, and i am not sure how to manage the offsets in such cases. Even if i go back to rest the offset, how to make sure it is the correct offset for messages. I dont persist the payload of message record anywhere except it goes to the spring batch output file.

推荐答案

您需要为此使用持久性作业存储库并将 KafkaItemReader 配置为 保存其状态.状态包括分配给读取器的每个分区的偏移量,并将保存在块边界(也就是在每个事务中).

You need to use a persistent Job Repository for that and configure the KafkaItemReader to save its state. The state consists in the offset of each partition assigned to the reader and will be saved at chunk boundaries (aka at each transaction).

在重新启动的情况下,读取器将使用执行上下文中每个分区的最后一个偏移量进行初始化,并从它停止的位置继续.

In a restart scenario, the reader will be initialized with the last offset for each partition from the execution context and resume where it left off.

这篇关于如果在读取消息过程中发生任何异常,如何管理弹簧批处理作业中使用的 KafkaItemReader 中的偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆