Pyspark结构化流处理 [英] Pyspark Structured streaming processing
问题描述
我正在尝试制作一个具有火花的结构化流应用程序,其主要思想是从kafka源中读取,处理输入,写回另一个主题.我已经成功地进行了来自kafka的火花读写,但是我的问题是处理部分.我已经尝试过foreach函数在写回kafka之前捕获每一行并对其进行处理,但是它始终只执行foreach部分,而从未写回kafka.但是,如果我从写入流中删除foreach部分,它将继续写入,但是现在我失去了处理能力.
I am trying to make a structured streaming application with spark the main idea is to read from a kafka source, process the input, write back to another topic. i have successfully made spark read and write from and to kafka however my problem is with the processing part. I have tried the foreach function to capture every row and process it before writing back to kafka however it always only does the foreach part and never writes back to kafka. If i however remove the foreach part from the writestream it would continue writing but now i lost my processing.
如果有人可以举一个例子给我一个例子,我将非常感激.
if anyone can give me an example on how to do this with an example i would be extremely grateful.
这是我的代码
spark = SparkSession \
.builder \
.appName("StructuredStreamingTrial") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "KafkaStreamingSource") \
.load()
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "StreamSink") \
.option("checkpointLocation", "./testdir")\
.foreach(foreach_function)
.start().awaitTermination()
和foreach_function
就是
def foreach_function(df):
try:
print(df)
except:
print('fail')
pass
推荐答案
在基于Pyspark的结构化流API中将数据写入Kafka接收器之前进行处理,我们可以轻松地使用UDF函数进行任何类型的复杂转换.
Processing the data before writing into Kafka sink in Pyspark based Structured Streaming API,we can easily handle with UDF function for any kind of complex transformation .
示例代码在下面.此代码试图读取JSON格式的消息Kafka主题,并解析该消息,以将消息从JSON转换为CSV格式,然后重写为另一个主题.您可以代替'json_formatted'函数处理任何处理转换.
example code is in below . This code is trying to read the JSON format message Kafka topic and parsing the message to convert the message from JSON into CSV format and rewrite into another topic. You can handle any processing transformation in place of 'json_formatted' function .
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os
# Spark Streaming context :
spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)
# Creating readstream DataFrame :
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "KafkaStreamingSource") \
.load()
df1 = df.selectExpr( "CAST(value AS STRING)")
df1.registerTempTable("test")
def json_formatted(s):
val_dict = json.loads(s)
return str([
val_dict["after"]["ID"]
, val_dict["after"]["INST_NAME"]
, val_dict["after"]["DB_UNIQUE_NAME"]
, val_dict["after"]["DBNAME"]
, val_dict["after"]["MON_START_TIME"]
, val_dict["after"]["MON_END_TIME"]
]).strip('[]').replace("'","").replace('"','')
spark.udf.register("JsonformatterWithPython", json_formatted)
squared_udf = udf(json_formatted)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))
# Declaring the Readstream Schema DataFrame :
df2.coalesce(1).writeStream \
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "StreamSink") \
.option("checkpointLocation", "./testdir")\
.start()
ssc.awaitTermination()
这篇关于Pyspark结构化流处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!