如何使用scala在kafka消费者中实现反序列化? [英] How to implement deserialisation in kafka consumer using scala?

查看:45
本文介绍了如何使用scala在kafka消费者中实现反序列化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 kafka 消费者代码中有以下行.

I have the following line in my kafka consumer's code.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 

如何将此流行"反序列化为原始对象?通过将类扩展为可序列化,在 kafka 生产者中实现了可序列化.我正在使用 Scala 在 spark 中实现这一点.

How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.

推荐答案

你需要实现一个自定义的Decoder 并将预期的类型信息与解码器一起提供给 createStream 函数.

You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.

KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)

例如,如果您使用 String 作为键和 CustomContainer 作为值,您的流创建将如下所示:

For example, if you are using String as key and CustomContainer as value, your stream creation will look like this:

val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)  

鉴于您将消息以 new KeyedMessage[String,String] 的形式发送到 kafka,正确的解码器是这样的字符串解码器:

Given that you are enconding the messages to kafka as new KeyedMessage[String,String], the right decoder is a string decoder like this:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)

这会给你一个 DStream[String,String] 作为你处理的基础.

that will give you a DStream[String,String] as basis for your processing.

如果你想发送/接收一个特定的对象类型,你需要实现一个 Kafka 编码器解码器.幸运的是,PcapPacket 已经实现了你需要的方法:

If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it. Luckily for you, PcapPacket already implements the methods that you require to do that:

byte[] -> PcapPacket:public PcapPacket(byte[] buffer)

byte[] -> PcapPacket: public PcapPacket(byte[] buffer)

其余的是样板代码,用于实现 Kafka 所需的编码器/解码器接口.

The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.

这篇关于如何使用scala在kafka消费者中实现反序列化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆