卡夫卡&Spring Batch - 如何仅读取来自同一主题的未提交消息? [英] Kafka & Spring Batch - How to read ONLY uncommitted messages from the same topic?

查看:33
本文介绍了卡夫卡&Spring Batch - 如何仅读取来自同一主题的未提交消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring 批处理和 Kafka 进行小批量工作,它从 Kafka 主题读取 json 数据,将其转换为 Student 对象,更改一个值并将其发送回 Kafka 主题.一切正常,但我唯一的问题是我的消费者总是从主题的乞求中阅读.我需要它从最后一条未消费的消息中读取.我已经添加了这些属性:

I am working on a small batch with Spring batch and Kafka that reads json data from a Kafka topic, converts it to a Student object, changes a value and sends it back into a Kafka topic. Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message. I have already added those properties :

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value

但这似乎不起作用,在消费者启动时,它处理所有消息.有人对如何使用 Spring Batch 和 Kafka 有想法吗?这是我的代码:

But this doesn't seem to work, on start-up of the consumer, it processes all the messages. Anybody has an idea on how to do it with Spring Batch and Kafka please ? This is my code :

BatchStudent.java:

@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
    public static void main(String[] args) {
        SpringApplication.run(BatchStudent.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final KafkaTemplate<Integer, Student> template;
    private final KafkaProperties properties;

    @Value("${kafka.topic.consumer}")
    private String topic;

    @Bean
    public ItemProcessor<Student, Student> customItemProcessor() {
        return new CustomProcessor();
    }

    @Bean
    Job job() {
        return this.jobBuilderFactory.get("job")
                .start(start())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    KafkaItemWriter<Integer, Student> writer() {
        return new KafkaItemWriterBuilder<Integer, Student>()
                .kafkaTemplate(template)
                .itemKeyMapper(Student::getId)
                .build();
    }

    @Bean
    public KafkaItemReader<Integer, Student> reader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Integer, Student>()
                .partitions(0)
                .consumerProperties(props)
                .name("students-consumer-reader")
                .saveState(true)
                .topic(topic)
                .build();
    }

    @Bean
    Step start() {
        return this.stepBuilderFactory
                .get("step")
                .<Student, Student>chunk(10)
                .writer(writer())
                .processor(customItemProcessor())
                .reader(reader())
                .build();
    }
}

app.yml

spring.batch.initialize-schema: always

#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student

#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092

#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student

Student.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id;
    Integer count;
}

CustomProcessor.java

@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student studentRecieved) {
        final Student studentSent = new Student();
        studentSent.setId(studentRecieved.getId());
        studentSent.setCount(200);
        return studentSent;
    }
}

感谢您的帮助!

推荐答案

一切正常,但我唯一的问题是我的消费者总是从主题的乞求中阅读.我需要它从最后一条未消费的消息中读取.

Everything is working fine, but my only problem is that my consumer is ALWAYS reading from the begging of the topic. I need it to read from the last non consumed message.

Spring Batch 4.3 引入了一种从存储在 Kafka 中的偏移量中消费记录的方法.我在去年 Spring One 的演讲中谈到了这个特性:Spring Batch 4.3 有什么新功能?.您可以使用 setPartitionOffsets:

Spring Batch 4.3 introduced a way to consume records from the offset stored in Kafka. I talked about this feature in my talk at Spring One last year: What's new in Spring Batch 4.3?. You can configure the kafka reader with a custom starting offset in each partition by using setPartitionOffsets:

Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.

你可以在 这个测试用例.

这篇关于卡夫卡&amp;Spring Batch - 如何仅读取来自同一主题的未提交消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆