如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息? [英] How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?

查看:58
本文介绍了如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

该问题针对Spring Kafka,与 Apache Kafka有关高级消费者:跳过损坏的邮件

This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages

是否可以将Spring Kafka使用者配置为跳过无法读取/处理(损坏)的记录?

Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?

我看到一种情况,如果无法反序列化,则消费者将卡在同一记录上.这是消费者抛出的错误.

I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止.

The consumer polls the topic and just keeps printing the same error in a loop till program is killed.

在具有以下消费者工厂配置的@KafkaListener中,

In a @KafkaListener that has the following Consumer factory configurations,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

推荐答案

您需要 ErrorHandlingDeserializer :如果您无法转到该 2.2 版本,请考虑实施自己的版本,并为无法正确反序列化的那些记录返回 null .

If you can't move to that 2.2 version, consider to implement your own and return null for those records which can't be deserialized properly.

源代码在这里: 查看全文

相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆