卡夫卡& amp;Spring Batch-如何仅从同一主题读取未提交的消息? [英] Kafka & Spring Batch - How to read ONLY uncommitted messages from the same topic?
问题描述
我正在使用Spring批处理和Kafka进行小批量处理,该批处理从Kafka主题读取json数据,将其转换为Student对象,更改值并将其发送回Kafka主题.一切工作正常,但是我唯一的问题是我的消费者总是从这个主题的乞求中读取内容.我需要它来读取最后一条未消耗的消息.我已经添加了这些属性:
I am working on a small batch with Spring batch and Kafka that reads json data from a Kafka topic, converts it to a Student object, changes a value and sends it back into a Kafka topic. Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message. I have already added those properties :
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
但这似乎不起作用,在使用者启动时,它会处理所有消息.任何人都对如何使用Spring Batch和Kafka有想法吗?这是我的代码:
But this doesn't seem to work, on start-up of the consumer, it processes all the messages. Anybody has an idea on how to do it with Spring Batch and Kafka please ? This is my code :
BatchStudent.java :
@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
public static void main(String[] args) {
SpringApplication.run(BatchStudent.class, args);
}
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaTemplate<Integer, Student> template;
private final KafkaProperties properties;
@Value("${kafka.topic.consumer}")
private String topic;
@Bean
public ItemProcessor<Student, Student> customItemProcessor() {
return new CustomProcessor();
}
@Bean
Job job() {
return this.jobBuilderFactory.get("job")
.start(start())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
KafkaItemWriter<Integer, Student> writer() {
return new KafkaItemWriterBuilder<Integer, Student>()
.kafkaTemplate(template)
.itemKeyMapper(Student::getId)
.build();
}
@Bean
public KafkaItemReader<Integer, Student> reader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Integer, Student>()
.partitions(0)
.consumerProperties(props)
.name("students-consumer-reader")
.saveState(true)
.topic(topic)
.build();
}
@Bean
Step start() {
return this.stepBuilderFactory
.get("step")
.<Student, Student>chunk(10)
.writer(writer())
.processor(customItemProcessor())
.reader(reader())
.build();
}
}
app.yml
spring.batch.initialize-schema: always
#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student
#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092
#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
Student.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
Integer id;
Integer count;
}
CustomProcessor.java
@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student studentRecieved) {
final Student studentSent = new Student();
studentSent.setId(studentRecieved.getId());
studentSent.setCount(200);
return studentSent;
}
}
谢谢您的帮助!
推荐答案
一切正常,但是我唯一的问题是我的消费者总是在阅读该主题的乞求.我需要它来读取上一条未消耗的消息.
Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message.
Spring Batch 4.3引入了一种从卡夫卡中存储的偏移量中消费记录的方法.我在去年春季一的演讲中谈到了此功能: Spring Batch 4.3的新功能是什么?.您可以使用
Spring Batch 4.3 introduced a way to consume records from the offset stored in Kafka. I talked about this feature in my talk at Spring One last year: What's new in Spring Batch 4.3?. You can configure the kafka reader with a custom starting offset in each partition by using setPartitionOffsets:
Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.
您可以在 查看全文