PySpark:反序列化eventhub捕获avro文件中包含的Avro序列化消息 [英] PySpark: Deserializing an Avro serialized message contained in an eventhub capture avro file

查看:86
本文介绍了PySpark:反序列化eventhub捕获avro文件中包含的Avro序列化消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

AVRO序列化事件被发送到天蓝色事件中心.这些事件使用azure事件中心捕获功能持久存储.捕获的数据以及事件中心元数据均以Apache Avro格式编写.捕获的avro文件中包含的原始事件应使用(py)Spark进行分析.

AVRO serialized events are sent to an azure event hub. These events are stored persistently using azure event hubs capture feature. Captured data, along with event hub metadata, is written in Apache Avro format. The original events contained in the capture avro file shall be analyzed using (py)Spark.

如何使用(py)Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:该事件的平均模式无法被阅读器应用程序识别,但是它作为avro标头包含在消息中)

How to deserialize an AVRO serialized event that is contained within a field / column of an AVRO file using (py)Spark? (Annotation: the avro schema of the event is not know by the reader application, but it is contained within the message as avro header)

背景是用于IoT场景的分析平台.消息由在kafka上运行的IoT平台提供.为了更灵活地更改模式,战略决策是坚持使用avro格式.为了启用Azure流分析(ASA)的使用,每条消息都指定了avro架构(否则ASA无法反序列化该消息).

The Background is an analytical platform for an IoT scenario. Messages are provided by an IoT platform running on kafka. To be more flexible with schema changes the strategic decision is to stick with avro format. To enable the use of Azure Stream Analytics (ASA) the avro schema is specified with each message (otherwise ASA is not able to deserialize the message).

由事件中心捕获功能生成的avro文件的架构如下所示:

The schema of the avro files generated by the event hub capture feature is as listed below:

{
    "type":"record",
    "name":"EventData",
    "namespace":"Microsoft.ServiceBus.Messaging",
    "fields":[
                 {"name":"SequenceNumber","type":"long"},
                 {"name":"Offset","type":"string"},
                 {"name":"EnqueuedTimeUtc","type":"string"},
                 {"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Body","type":["null","bytes"]}
             ]
}

(请注意,实际消息以字节为单位存储在主体字段中)

(note that the actual message is stored in the body field as bytes)

为说明起见,我将具有以下avro模式的事件发送到事件中心:

For illustration i sent events with the following avro schema to event hub:

{
    "type" : "record",
    "name" : "twitter_schema",
    "namespace" : "com.test.avro",
    "fields" : [ 
                {"name" : "username","type" : "string"}, 
                {"name" : "tweet","type" : "string"},
                {"name" : "timestamp","type" : "long"}
    ],
}

示例事件

{
    "username": "stackoverflow",
    "tweet": "please help deserialize me",
    "timestamp": 1366150681
}

示例Avro邮件有效载荷

(编码为字符串/请注意,其中包含avro模式)

example avro message payload

(encoded as a string / note that avro schema is included)

Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}

因此,最后,此有效负载将作为字节存储在捕获avro文件的正文"字段中.

So at the end this payload will be stored as Bytes in the 'Body' field of the capture avro file.

.

为便于使用,测试和调试,我目前使用pyspark jupyter笔记本.

For ease of use, testing and debugging i currently use a pyspark jupyter notebook.

Spark会话的配置:

%%configure
{
    "conf": {
        "spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
    }
}

将avro文件读入数据框并输出结果:

capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()

结果:

+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset|     EnqueuedTimeUtc|SystemProperties|Properties|                Body|
+--------------+------+--------------------+----------------+----------+--------------------+
|            71|  9936|11/4/2018 4:59:54 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|
|            72| 10448|11/4/2018 5:00:01 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|

获取正文"字段的内容并将其转换为字符串:

msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])

这就是我使代码起作用的程度.花了很多时间尝试反序列化实际消息,但是没有成功.我将不胜感激!

That's how far i got the code working. Spent a lot of time trying to deserialize the actual message, but without success. I would appreciate any help!

一些其他信息:Spark在Microsoft Azure HDInsight 3.6群集上运行.Spark版本为2.2.Python版本是2.7.12.

Some additional info: Spark is running on a Microsoft Azure HDInsight 3.6 cluster. Spark Version is 2.2. Python Version is 2.7.12.

推荐答案

您要执行的操作是将 .decode('utf-8')应用于正文"列中的每个元素.您必须从解码创建 UDF ,因此您可以应用它.可以使用以下格式创建UDF

What you want to do is apply .decode('utf-8') to each element in the Body column. You have to create an UDF from decode, so you can apply it. The UDF can be created with

from pyspark.sql import functions as f

decodeElements = f.udf(lambda a: a.decode('utf-8'))

这里是解析自定义Blob存储终端节点的IoT中心:

storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"

# Read all files from one day. All PartitionIds are included. 
file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
file_type = "avro"

# Read raw data
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)

# Decode Body into strings
from pyspark.sql import functions as f

decodeElements = f.udf(lambda a: a.decode('utf-8'))

jsons = raw.select(
    raw['EnqueuedTimeUtc'],
    raw['SystemProperties.connectionDeviceId'].alias('DeviceId'), 
    decodeElements(raw['Body']).alias("Json")
)

# Parse Json data
from pyspark.sql.functions import from_json

json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')

Disclamer:我是Python和Databricks的新手,我的解决方案可能不够完美.但是我花了超过一天的时间来完成这项工作,我希望这对某人来说可以是一个很好的起点.

Disclamer: I am new to both Python and Databricks and my solution is probably less than perfect. But I spent more than a day to get this working and I hope it can be a good starting point for someone.

这篇关于PySpark:反序列化eventhub捕获avro文件中包含的Avro序列化消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆