Spring-Kafka使用Spring Boot版本2.3.7进行批量错误处理 [英] spring-kafka consumer batch error handling with spring boot version 2.3.7
本文介绍了Spring-Kafka使用Spring Boot版本2.3.7进行批量错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试执行Spring Kafka批处理错误处理。首先,我有几个问题。
监听器错误处理程序和容器错误处理程序有什么区别,这两个类别有哪些错误?
您能帮助一些样本更好地了解这一点吗?
这是我们的设计:
- 每隔一定时间间隔轮询
- 批量消费消息
- 基于键推送到本地缓存(应用缓存)(避免重复事件)
- 批处理完成后,将所有值逐个推送到另一个主题。
- 操作3完成后清除缓存并手动确认偏移量。
以下是我的错误处理计划:
public ConcurrentKafkaListenerContainerFactory<String, String> myListenerPartitionContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
factory.setConcurrency(partionCount);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setIdleBetweenPolls(pollInterval);
factory.setBatchListener(true);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myPartitionsListenerContainerFactory()
{
return myListenerPartitionContainerFactory(groupIdPO);
}
@Bean
public RecoveringBatchErrorHandler(KafkaTemplate<String, String> errorKafkaTemplate) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(errorKakfaTemplate);
RecoveringBatchErrorHandler errorHandler =
new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 5000)); // push error event to the error topic
}
@KafkaListener(id = "mylistener", topics = "someTopic", containerFactory = "myPartitionsListenerContainerFactory"))
public void listen(List<ConsumerRecord<String, String>> records, @Header(KafkaHeaders.MESSAGE_KEY) String key, Acknowledgement ack) {
Map hashmap = new Hashmap<>();
records.forEach(record -> {
try {
//key will be formed based on the input record - it will be id.
hashmap.put(key, record);
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", record);
}
});
// Once success each messages to another topic.
try {
hashmap.forEach( (key,value) -> { push to another topic })
hashmap.clear();
ack.acknowledge();
} catch(Exception ex) {
//handle producer exceptions
}
}
方向是好的还是需要改进的?还需要实现什么类型的容器和侦听器处理程序?
@Gary Russell..你能帮个忙吗?
推荐答案
侦听程序错误处理程序适用于错误处理程序可以向发件人返回有意义的答复的请求/答复情况。
您需要引发异常才能触发容器错误处理程序,并且您需要知道原始批处理中的索引以告诉它哪条记录失败。
如果您使用的是类似的手动确认,则可以使用nack()方法来指示失败的记录(在这种情况下不要引发异常)。
这篇关于Spring-Kafka使用Spring Boot版本2.3.7进行批量错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文