在Spark Streaming/结构化流媒体中读取来自Kafka的Avro消息 [英] Reading avro messages from Kafka in spark streaming/structured streaming
问题描述
我是第一次使用pyspark. Spark版本:2.3.0 Kafka版本:2.2.0
I am using pyspark for the first time. Spark Version : 2.3.0 Kafka Version : 2.2.0
我有一个kafka生产者,它以avro格式发送嵌套数据,我正尝试在pyspark中以spark-streaming/结构化流编写代码,这会将来自kafka的avro反序列化为数据帧,然后以拼花格式将其写入到s3中. 我能够在spark/scala中找到avro转换器,但尚未添加对pyspark的支持.我如何在pyspark中将其转换. 谢谢.
I have a kafka producer which sends nested data in avro format and I am trying to write code in spark-streaming/ structured streaming in pyspark which will deserialize the avro coming from kafka into dataframe do transformations write it in parquet format into s3. I was able to find avro converters in spark/scala but support in pyspark has not yet been added. How do I convert the same in pyspark. Thanks.
推荐答案
就像您提到的那样,从Kafka读取Avro消息并通过pyspark进行解析,没有相同的直接库.但是我们可以通过编写小型包装程序来读取/解析Avro消息,并在您的pyspark流式代码中将该函数作为UDF调用,如下所示.
As like you mentioned , Reading Avro message from Kafka and parsing through pyspark, don't have direct libraries for the same . But we can read/parsing Avro message by writing small wrapper and call that function as UDF in your pyspark streaming code as below .
参考: Pyspark 2.4.0,请从中读取avro具有读取流的kafka-Python
注意:自Spark 2.4起,Avro是内置的但外部数据源模块.请按照"Apache Avro数据源指南"的部署"部分部署应用程序.
Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".
引用:: https://spark-test.github.io/pyspark-coverage-site/pyspark_sql_avro_functions_py.html
火花提交:
[调整软件包版本以匹配基于spark/avro版本的安装]
[adjust the package versions to match spark/avro version based installation]
/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064
Pyspark流代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os
# Spark Streaming context :
spark = SparkSession.builder.appName('streamingdata').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)
# Kafka Topic Details :
KAFKA_TOPIC_NAME_CONS = "topicname"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost.com:9093'
# Creating readstream DataFrame :
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss" ,"false")\
.option("kafka.security.protocol","SASL_SSL")\
.option("kafka.client.id" ,"MCI-CIL")\
.option("kafka.sasl.kerberos.service.name","kafka")\
.option("kafka.ssl.truststore.location", "/path/kafka_trust.jks") \
.option("kafka.ssl.truststore.password", "changeit") \
.option("kafka.sasl.kerberos.keytab","/path/bdpda.headless.keytab") \
.option("kafka.sasl.kerberos.principal","bdpda") \
.load()
df1 = df.selectExpr( "CAST(value AS STRING)")
df1.registerTempTable("test")
# Deserilzing the Avro code function
from pyspark.sql.column import Column, _to_java_column
def from_avro(col):
jsonFormatSchema = """
{
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}"""
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
spark.udf.register("JsonformatterWithPython", from_avro)
squared_udf = udf(from_avro)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))
# Declaring the Readstream Schema DataFrame :
df2.coalesce(1).writeStream \
.format("parquet") \
.option("checkpointLocation","/path/chk31") \
.outputMode("append") \
.start("/path/stream/tgt31")
ssc.awaitTermination()
这篇关于在Spark Streaming/结构化流媒体中读取来自Kafka的Avro消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!