如何使用来自Kafka的Python解码/反序列化Avro [英] How to decode/deserialize Avro with Python from Kafka

查看:675
本文介绍了如何使用来自Kafka的Python解码/反序列化Avro的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从远程服务器上接收Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),该消息表示带有json字典的clickstream数据,并带有用户代理,位置,URL等字段.这是一条消息看起来像:

I am receiving from a remote server Kafka Avro messages in Python (using the consumer of Confluent Kafka Python library), that represent clickstream data with json dictionaries with fields like user agent, location, url, etc. Here is what a message looks like:

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'

如何解码?我尝试了bson解码,但是该字符串未被识别为UTF-8,因为我猜它是特定的Avro编码.我发现 https://github.com/verisign/python-confluent-schemaregistry 但它仅支持Python 2.7.理想情况下,我想与Python 3.5+和MongoDB一起处理数据并将其存储为我当前的基础结构.

How to decode it? I tried bson decode but the string was not recognized as UTF-8 as it's a specific Avro encoding I guess. I found https://github.com/verisign/python-confluent-schemaregistry but it only supports Python 2.7. Ideally I would like to work with Python 3.5+ and MongoDB to process the data and store it as it's my current infrastructure.

推荐答案

我以为Avro库只是读取Avro文件,但实际上解决了解码Kafka消息的问题,如下所示:我首先导入库,并给模式文件作为参数,然后创建一个将消息解码为字典的函数,我可以在使用者循环中使用它.

I thought Avro library was just to read Avro files, but it actually solved the problem of decoding Kafka messages, as follow: I first import the libraries and give the schema file as a parameter and then create a function to decode the message into a dictionary, that I can use in the consumer loop.

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

这篇关于如何使用来自Kafka的Python解码/反序列化Avro的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆