Spark Python Avro Kafka 反序列化器 [英] Spark Python Avro Kafka Deserialiser

查看:42
本文介绍了Spark Python Avro Kafka 反序列化器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 python spark 应用程序中创建了一个 kafka 流,并且可以解析通过它出现的任何文本.

I have created a kafka stream in a python spark app and can parse any text that comes through it.

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

我想将其更改为能够解析来自 kafka 主题的 avro 消息.从文件解析 avro 消息时,我这样做:

I want to change this to be able to parse avro messages from a kafka topic. When parsing avro messages from a file, I do it like:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  

我是 python 和 spark 的新手,如何更改流以解析 avro 消息?另外如何指定从 Kafka 读取 Avro 消息时要使用的模式???我以前在 java 中完成了所有这些,但 python 使我感到困惑.

I'm new to python and spark, how do I change the stream to be able to parse the avro message? Also how can I specify a schema to use when reading the Avro message from Kafka??? I've done all this in java before but python is confusing me.

我尝试更改以包含 avro 解码器

I tried changing to include the avro decoder

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

但我收到以下错误

            TypeError: 'DatumReader' object is not callable

推荐答案

我遇到了同样的挑战 - 在 pyspark 中反序列化来自 Kafka 的 avro 消息,并使用 Confluent Schema Registry 模块的 Messageserializer 方法解决了这个问题,就像在我们的例子中存储了模式一样在 Confluent Schema Registry 中.

I had the same challenge - deserializing avro messages from Kafka in pyspark and solved it with the Confluent Schema Registry module's Messageserializer method, as in our case the schema is stored in a Confluent Schema Registry.

您可以在 https://github.com/verisign/python-confluent- 找到该模块模式注册

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

很明显,正如您所看到的,这段代码使用的是新的、直接的方法,没有接收器,因此使用了 createdDirectStream(请参阅 https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)

Obviously as you can see this code is using the new, direct approach with no receivers, hence the createdDirectStream (see more at https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)

这篇关于Spark Python Avro Kafka 反序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆