Pyspark 2.4.0,使用读取流从kafka读取avro-Python [英] Pyspark 2.4.0, read avro from kafka with read stream - Python
本文介绍了Pyspark 2.4.0,使用读取流从kafka读取avro-Python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用PySpark 2.4.0从Kafka读取Avro消息.
I am trying to read avro messages from Kafka, using PySpark 2.4.0.
spark-avro外部模块可以为读取avro提供此解决方案 文件:
The spark-avro external module can provide this solution for reading avro files:
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
但是,我需要阅读流式Avro消息.库文档建议使用 from_avro()函数,该函数仅适用于Scala和Java.
However, I need to read streamed avro messages. The library documentation suggests using the from_avro() function, which is only available for Scala and Java.
是否还有其他模块支持读取从Kafka流式传输的Avro消息?
Are there any other modules that support reading avro messages streamed from Kafka?
推荐答案
您可以包含spark-avro软件包,例如使用--packages
(调整版本以匹配spark安装):
You can include spark-avro package, for example using --packages
(adjust versions to match spark installation):
bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0
并提供您自己的包装器:
and provide your own wrappers:
from pyspark.sql.column import Column, _to_java_column
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
查看全文