在读取消息的过程中发生任何异常的情况下,如何在春季批处理作业中使用的KafkaItemReader中管理偏移量 [英] How to manage offsets in KafkaItemReader used in spring batch job in case of any exception occurs in mid of reading messages

查看:133
本文介绍了在读取消息的过程中发生任何异常的情况下,如何在春季批处理作业中使用的KafkaItemReader中管理偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是第一次使用基于Kafka的Spring Boot应用程序.我的要求是使用spring batch创建一个包含所有记录的输出文件.我创建了一个春季批处理作业,在其中与扩展了KafkaItemReader的自定义类集成在一起.我现在不想提交偏移量,因为我可能需要回去从已经消耗的偏移量中读取一些记录.我的使用者配置具有这些属性;

I am working on a Kafka based spring boot application for the first time. My requirement was to create an output file with all the records using spring batch. I created a spring batch job where integrated with a customized class which extends KafkaItemReader. I don't want to commit the offsets for now as i might need to go back read some records from already consumed offsets. My consumer config has these properties;

enable.auto.commit:否 自动偏移重置:最新 group.id:

enable.auto.commit: false auto-offset-reset: latest group.id:

有两种情况-> 1.一个幸福的道路,在这里我可以从kafka主题中读取所有消息并进行转换,然后使用上述配置将它们写入输出文件. 2.阅读消息时出现异常,在这种情况下我不确定如何管理偏移量.即使我回到静止的偏移量,如何确保它是消息的正确偏移量.我不会在任何地方持久保存消息记录的有效负载,除非它进入spring批处理输出文件.

There are two scenarios-> 1. A happy path, where i can read all the messages from kafka topic and transform them and then write them to an output file using above configuration. 2. I am getting an exception while reading thru' the messages, and i am not sure how to manage the offsets in such cases. Even if i go back to rest the offset, how to make sure it is the correct offset for messages. I dont persist the payload of message record anywhere except it goes to the spring batch output file.

推荐答案

您需要为此使用持久性作业库,并将KafkaItemReader配置为

You need to use a persistent Job Repository for that and configure the KafkaItemReader to save its state. The state consists in the offset of each partition assigned to the reader and will be saved at chunk boundaries (aka at each transaction).

在重新启动的情况下,将使用执行上下文中每个分区的最后一个偏移量来初始化读取器,并在中断的地方继续读取.

In a restart scenario, the reader will be initialized with the last offset for each partition from the execution context and resume where it left off.

这篇关于在读取消息的过程中发生任何异常的情况下,如何在春季批处理作业中使用的KafkaItemReader中管理偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆