如何使用pyspark和自定义python函数处理eventhub流 [英] How to process eventhub stream with pyspark and custom python function
问题描述
我当前的设置是:
- 使用pyspark 2.2.1的Spark 2.3.0
- 使用Azure IOTHub/EventHub的流服务
- 一些基于pandas,matplotlib等的自定义python函数
I'm using https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md as an example on how to read the data but:
- 不能使用foreach接收器,因为这不是在python中实现的
- 当我尝试调用.rdd,.map或.flatMap时,出现异常:必须使用writeStream.start()执行带有流源的查询"
获取流的每个元素并将其通过python函数传递的正确方法是什么?
What is the correct way to get each element of the stream and pass it through a python function?
谢谢
Ed
推荐答案
第一步,您将定义一个数据帧,以从EventHub或IoT-Hub读取数据作为流:
In the first step you define a dataframe reading the data as a stream from your EventHub or IoT-Hub:
from pyspark.sql.functions import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
数据以二进制形式存储在body属性中.要获取身体的元素,您必须定义结构:
The data is stored binary in the body attribute. To get the elements of the body you have to define the structure:
from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])
,然后将模式应用于转换为字符串的正文上
and apply the schema on the body casted as a string:
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")
在生成的数据帧上,您可以应用函数,例如. G.在名称"列上调用自定义函数u_make_hash:
On the resulting dataframe you can apply functions, e. g. call the custom function u_make_hash on the column 'name':
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))
这篇关于如何使用pyspark和自定义python函数处理eventhub流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!