使用 springboot 反序列化 KafkaConsumer 中的 kafka 消息 [英] Deserialize kafka messages in KafkaConsumer using springboot

查看:51
本文介绍了使用 springboot 反序列化 KafkaConsumer 中的 kafka 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 springboot 应用程序,它可以监听 kafka 消息并将它们转换为对象

I have a springboot app that listen kafka messages and convert them to object

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        Hostel hostel = objectMapper.readValue(message, Hostel.class);
}

我想知道是否可以直接做 ti

I woder if it is possible to do ti directly

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException { 
}

推荐答案

您可以使用 spring-kafka 来完成.但是你需要使用 自定义反序列化器(或JsonDeserializer)在容器工厂

You can do it using spring-kafka. But then you need to use a custom deserializer (or a JsonDeserializer) in the container factory

@KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
//do Something with myRequest
ack.acknowledge()
}

您的 ContainerFactory 看起来像

Your ContainerFactory will look something like

@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
return factory
}

你的反序列化器看起来像

Your Deserialiser will look like

public class MyRequestDeserializer implements Deserializer {
private static ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map map, boolean b) {
}

@Override
public MyRequest deserialize(String arg0, byte[] msgBytes) {
    try {
        return objectMapper.readValue(new String(msgBytes), MyRequest.class);
    } catch (IOException ex) {
        log.warn("JSON parse/ mapping exception occurred. ", ex);
        return new MyRequest();
    }
}

@Override
public void close() {
    log.debug("MyRequestDeserializer closed");
}
}

或者,您可以使用 spring 文档

Alternatively, you can use the default JsonDeserializer as given in spring docs

这篇关于使用 springboot 反序列化 KafkaConsumer 中的 kafka 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆