如何在 Flink (Scala) 中反序列化来自 Kafka 的 Avro 消息? [英] How to deserialize Avro messages from Kafka in Flink (Scala)?

查看:28
本文介绍了如何在 Flink (Scala) 中反序列化来自 Kafka 的 Avro 消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将来自 Kafka 的消息读入 Flink Shell (Scala),如下:

I'm reading messages from Kafka into Flink Shell (Scala), as follows :

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091

在这里,我使用 SimpleStringSchema() 作为解串器,但实际上消息有另一个 Avro 模式(比如 msg.avsc).我如何基于这个不同的 Avro 模式 (msg.avsc) 创建一个反序列化器,来反序列化传入的 Kafka 消息?

Here, I'm using the SimpleStringSchema() as the deserializer, but actually the messages have another Avro schema (say msg.avsc). How do I create a deserializer based on this different Avro schema (msg.avsc), to deserialize the incoming Kafka messages?

我找不到在 Scala 中执行此操作的任何代码示例或教程,因此任何输入都会有所帮助.看来我可能需要扩展实现了

I haven't been able to find any code examples or tutorials for doing this in Scala, so any inputs would help. It seems that I may need to extend and implement

org.apache.flink.streaming.util.serialization.DeserializationSchema

org.apache.flink.streaming.util.serialization.DeserializationSchema

用于解码消息,但我不知道该怎么做.任何教程或说明都会有很大帮助.因为,我不想进行任何自定义处理,而只是根据 Avro 模式 (msg.avsc) 解析消息,因此任何快速执行此操作的方法都会非常有帮助.

for decoding the messages, but I don't know, how to do it. Any tutorials or instructions would be of great help. Since, I don't want to do any custom processing, but just parse the messages as per the Avro schema (msg.avsc), any quick methods of doing this would be very helpful.

推荐答案

我在 java 中找到了 AvroDeserializationSchema 类的示例

I found example for AvroDeserializationSchema class in java

https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/AvroDeserializationSchema.java

代码片段:

如果你想反序列化为特定的案例类,那么使用 new FlinkKafkaConsumer011[case_class_name], new AvroDeserializationSchema[case_class_name](classOf[case_class_name]

If you want to deserialize into specific case class then use new FlinkKafkaConsumer011[case_class_name], new AvroDeserializationSchema[case_class_name](classOf[case_class_name]

val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]
 ("test", new AvroDeserializationSchema[case_class_name](classOf[case_class_name]), properties))

如果您使用 Confluent 的架构注册表,那么首选的解决方案是使用 Confluent 提供的 Avro serde.我们只调用 deserialize() 并且要使用的最新版本的 Avro 模式的解析是在幕后自动完成的,不需要字节操作.

If you use Confluent's schema registry, then preferred solution would be to use the Avro serde provided by Confluent. We just call deserialize() and the resolution of the latest version of the Avro schema to use is done automatically behind the scene and no byte manipulation is required.

scala 中类似下面的内容.

Something like below in scala.

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
  false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                       topic: String, partition: Int, offset: Long): KafkaKV = {

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

    KafkaKV(key, value)
    }

...

这里有详细解释:http://svend.kelesia.com/how-to-integrate-flink-with-confluents-schema-registry.html#how-to-integrate-flink-with-confluents-schema-registry

希望有帮助!

这篇关于如何在 Flink (Scala) 中反序列化来自 Kafka 的 Avro 消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆