Pyspark结构化流处理 [英] Pyspark Structured streaming processing

查看:155
本文介绍了Pyspark结构化流处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试制作一个具有火花的结构化流应用程序,其主要思想是从kafka源中读取,处理输入,写回另一个主题.我已经成功地进行了来自kafka的火花读写,但是我的问题是处理部分.我已经尝试过foreach函数在写回kafka之前捕获每一行并对其进行处理,但是它始终只执行foreach部分,而从未写回kafka.但是,如果我从写入流中删除foreach部分,它将继续写入,但是现在我失去了处理能力.

I am trying to make a structured streaming application with spark the main idea is to read from a kafka source, process the input, write back to another topic. i have successfully made spark read and write from and to kafka however my problem is with the processing part. I have tried the foreach function to capture every row and process it before writing back to kafka however it always only does the foreach part and never writes back to kafka. If i however remove the foreach part from the writestream it would continue writing but now i lost my processing.

如果有人可以举一个例子给我一个例子,我将非常感激.

if anyone can give me an example on how to do this with an example i would be extremely grateful.

这是我的代码

spark = SparkSession \
.builder \
.appName("StructuredStreamingTrial") \
.getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "KafkaStreamingSource") \
  .load()

ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream \
  .outputMode("update") \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "StreamSink") \
  .option("checkpointLocation", "./testdir")\
  .foreach(foreach_function)
  .start().awaitTermination()

foreach_function就是

def foreach_function(df):
    try:
        print(df)
    except:
        print('fail')
    pass 

推荐答案

在基于Pyspark的结构化流API中将数据写入Kafka接收器之前进行处理,我们可以轻松地使用UDF函数进行任何类型的复杂转换.

Processing the data before writing into Kafka sink in Pyspark based Structured Streaming API,we can easily handle with UDF function for any kind of complex transformation .

示例代码在下面.此代码试图读取JSON格式的消息Kafka主题,并解析该消息,以将消息从JSON转换为CSV格式,然后重写为另一个主题.您可以代替'json_formatted'函数处理任何处理转换.

example code is in below . This code is trying to read the JSON format message Kafka topic and parsing the message to convert the message from JSON into CSV format and rewrite into another topic. You can handle any processing transformation in place of 'json_formatted' function .

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os

#  Spark Streaming context :

spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)


#  Creating  readstream DataFrame :

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "KafkaStreamingSource") \
  .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

df1.registerTempTable("test")


def json_formatted(s):
    val_dict = json.loads(s)
    return str([
                    val_dict["after"]["ID"]
                ,   val_dict["after"]["INST_NAME"]
                ,   val_dict["after"]["DB_UNIQUE_NAME"]
                ,   val_dict["after"]["DBNAME"]
                ,   val_dict["after"]["MON_START_TIME"]
                ,   val_dict["after"]["MON_END_TIME"]
                ]).strip('[]').replace("'","").replace('"','')

spark.udf.register("JsonformatterWithPython", json_formatted)

squared_udf = udf(json_formatted)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))



#  Declaring the Readstream Schema DataFrame :

df2.coalesce(1).writeStream \
   .writeStream \
   .outputMode("update") \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "localhost:9092") \
   .option("topic", "StreamSink") \
   .option("checkpointLocation", "./testdir")\
   .start()

ssc.awaitTermination()

这篇关于Pyspark结构化流处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆