如何使用pyspark和自定义python函数处理eventhub流 [英] How to process eventhub stream with pyspark and custom python function

查看:156
本文介绍了如何使用pyspark和自定义python函数处理eventhub流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前的设置是:

  • 使用pyspark 2.2.1的Spark 2.3.0
  • 使用Azure IOTHub/EventHub的流服务
  • 一些基于pandas,matplotlib等的自定义python函数

我正在使用

I'm using https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md as an example on how to read the data but:

  • 不能使用foreach接收器,因为这不是在python中实现的
  • 当我尝试调用.rdd,.map或.flatMap时,出现异常:必须使用writeStream.start()执行带有流源的查询"

获取流的每个元素并将其通过python函数传递的正确方法是什么?

What is the correct way to get each element of the stream and pass it through a python function?

谢谢

Ed

推荐答案

第一步,您将定义一个数据帧,以从EventHub或IoT-Hub读取数据作为流:

In the first step you define a dataframe reading the data as a stream from your EventHub or IoT-Hub:

from pyspark.sql.functions import *

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

数据以二进制形式存储在body属性中.要获取身体的元素,您必须定义结构:

The data is stored binary in the body attribute. To get the elements of the body you have to define the structure:

from pyspark.sql.types import *

Schema = StructType([StructField("name", StringType(), True),
                      StructField("dt", LongType(), True),
                      StructField("main", StructType( 
                          [StructField("temp", DoubleType()), 
                           StructField("pressure", DoubleType())])),
                      StructField("coord", StructType( 
                          [StructField("lon", DoubleType()), 
                           StructField("lat", DoubleType())]))
                    ])

,然后将模式应用于转换为字符串的正文上

and apply the schema on the body casted as a string:

from pyspark.sql.functions import *

rawData = df. \
  selectExpr("cast(Body as string) as json"). \
  select(from_json("json", Schema).alias("data")). \
  select("data.*")

在生成的数据帧上,您可以应用函数,例如. G.在名称"列上调用自定义函数u_make_hash:

On the resulting dataframe you can apply functions, e. g. call the custom function u_make_hash on the column 'name':

 parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))  

这篇关于如何使用pyspark和自定义python函数处理eventhub流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆