星火Python的Avro的卡夫卡解串器 [英] Spark Python Avro Kafka Deserialiser

查看:1064
本文介绍了星火Python的Avro的卡夫卡解串器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经创建了蟒蛇火花应用卡夫卡流,并且可以解析自带通过它的任何文本。

  kafkaStream = KafkaUtils.createStream(SSC,zkQuorum,火花流媒体消费,{话题:1})

我想改变这是能够从卡夫卡的话题解析Avro的消息。当从一个文件解析的Avro的消息,我这样做:

 读者= DataFileReader(开放(customer.avro,R),DatumReader())

我是新来的Python和火花,我怎么改流能够解析Avro的消息?此外,我怎么能指定架构读卡夫卡的Avro的消息时使用?我所做的这一切在Java之前,但蟒蛇是混淆了我。

编辑:

我试图改变为包括德的Avro codeR

  kafkaStream = KafkaUtils.createStream(SSC,zkQuorum,火花流媒体消费,{话题:1},valueDe codeR = avro.io.DatumReader(架构) )

但我得到了以下错误

 类型错误:'DatumReader'对象不是可调用


解决方案

我有同样的挑战 - 在pyspark从卡夫卡反序列化的Avro消息和合流模式注册模块的Messageserializer方法解决了这个问题,在我们的情况下,架构存储在一个融合的模式注册。

您可以找到该模块在 https://github.com/verisign/python-confluent- schemaregistry

 从confluent.schemaregistry.client进口CachedSchemaRegistryClient
从confluent.schemaregistry.serializers导入MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(URL =的http://xx.xxx.xxx:8081')
串行= MessageSerializer(schema_registry_client)
#简单去$ C $替换C卡夫卡串流的内置德code解码UTF8()
高清德codeR(S):
    德coded_message = serializer.de code_message(S)
    返回德coded_messageKVS = KafkaUtils.createDirectStream(SSC,[mytopic],{metadata.broker.list:XXXXX:9092,YYYYY:9092},valueDe codeR =去codeR)线= kvs.map(拉姆达X:X [1])
lines.pprint()

显然,你可以看到这个code使用的是新的,直接的方法,没有接收器,因此createdDirectStream(多见于的 https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html

I have created a kafka stream in a python spark app and can parse any text that comes through it.

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

I want to change this to be able to parse avro messages from a kafka topic. When parsing avro messages from a file, I do it like:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  

I'm new to python and spark, how do I change the stream to be able to parse the avro message? Also how can I specify a schema to use when reading the Avro message from Kafka??? I've done all this in java before but python is confusing me.

Edit:

I tried changing to include the avro decoder

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

but I get the following error

            TypeError: 'DatumReader' object is not callable

解决方案

I had the same challenge - deserializing avro messages from Kafka in pyspark and solved it with the Confluent Schema Registry module's Messageserializer method, as in our case the schema is stored in a Confluent Schema Registry.

You can find that module at https://github.com/verisign/python-confluent-schemaregistry

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

Obviously as you can see this code is using the new, direct approach with no receivers, hence the createdDirectStream (see more at https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)

这篇关于星火Python的Avro的卡夫卡解串器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆