如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)? [英] How to generate summary statistics (using Summarizer.metrics) in streaming query?

查看:62
本文介绍了如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧.

每批

Spark数据帧:

基于上面数据框的屏幕截图,我希望对"temperature_value"列具有一些描述性统计信息.例如,最小值,最大值,平均值,计数,方差.

我在python中实现此目标的方法如下:

  import sys导入json导入psycopg2从pyspark导入SparkContext从pyspark.streaming导入StreamingContext从pyspark.sql导入SparkSession从pyspark.sql.types导入StructType,StructField,StringType,IntegerType从pyspark.sql.functions导入from_json,col,to_json从pyspark.sql.types导入*从pyspark.sql.functions导入爆炸从pyspark.sql.functions导入拆分从pyspark.sql.functions导入get_json_object从pyspark.ml.stat导入摘要器从pyspark.ml.feature导入VectorAssembler从pyspark.ml.feature导入StandardScaler从pyspark.sql.functions导入点亮,unix_timestamp从pyspark.sql导入功能为F将numpy导入为np从pyspark.mllib.stat导入统计信息spark = SparkSession.builder.appName(< spark_application_name>).getOrCreate()spark.sparkContext.setLogLevel("WARN")spark.streams.active数据= spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka_broker:< port_number>").option("subscribe",< topic_name>).option("startingOffsets","latest").load()模式= StructType([StructField("id",DoubleType()),StructField("timestamp_value",DoubleType()),StructField("device_id",DoubleType()),StructField("temperature_value",DoubleType()),StructField("comment",StringType())])telemetry_dataframe = data.selectExpr("CAST(值作为字符串)").select(from_json(col(值").cast("string"),schema).alias("tmp")).选择("tmp.*")telemetry_dataframe.printSchema()temperature_value_selection =遥测数据帧.select("temperature_value")temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature",temperature_value_selection ["temperature_value"].cast(DecimalType()))temperature_value_selection_new.printSchema()汇编程序= VectorAssembler(输入Cols = [设备温度"],输出Col =温度".)组装= assembler.transform(temperature_value_selection_new)assembled_new = assembled.withColumn("timestamp",F.current_timestamp())assembled_new.printSchema()#scaler = StandardScaler(inputCol =温度",outputCol ="scaledTemperatures",其中Std = True,withMean = False).fit(组装)#缩放= scaler.transform(已组装)summaryr = Summarizer.metrics(最大",最小",方差",平均值",计数")descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").select(summarizer.summary(assembled_new.temperatures))#descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp)).orderBy('timestamp',ascending = False).select(summarizer.summary(assembled.temperatures))#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))#descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))#-------------------------------------------------------------------------------------#########################################查询#########################################query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime ="5 seconds").start()#.awaitTermination()query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime ="8 seconds").start()#.awaitTermination()query_3 = assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime ="11 seconds").start()#.awaitTermination()#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime ="14 seconds").start()#.awaitTermination()query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime ="17 seconds").start()#.awaitTermination() 

解决方案

当我尝试将outputMode更改为"complete"时,我的终端立即终止了火花流.

所有流查询都已启动并正在运行,但是pyspark应用程序(的主线程)甚至没有给它们提供长时间运行的机会(因为它不会等待由于#.awaitTermination()).

您应该使用 StreamingQuery.awaitTermination()阻止pyspark应用程序的主线程,例如 query_1.awaitTermination()

Currently, I am using spark structured streaming to create data frames of random data in the form of (id, timestamp_value, device_id, temperature_value, comment).

Spark Dataframe per Batch:

Based on the screenshot of the data frame above, I would like to have some descriptive statistics for the column "temperature_value". For example, min, max, mean, count, variance.

My approach to achieve this in python is the following:

import sys
import json
import psycopg2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import get_json_object
from pyspark.ml.stat import Summarizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import lit,unix_timestamp
from pyspark.sql import functions as F
import numpy as np
from pyspark.mllib.stat import Statistics

spark = SparkSession.builder.appName(<spark_application_name>).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.streams.active

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_broker:<port_number>").option("subscribe", <topic_name>).option("startingOffsets", "latest").load()

schema = StructType([
    StructField("id", DoubleType()),
    StructField("timestamp_value", DoubleType()), 
    StructField("device_id", DoubleType()), 
    StructField("temperature_value", DoubleType()),
    StructField("comment", StringType())])

telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

telemetry_dataframe.printSchema()

temperature_value_selection = telemetry_dataframe.select("temperature_value")

temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature", temperature_value_selection["temperature_value"].cast(DecimalType()))

temperature_value_selection_new.printSchema()

assembler = VectorAssembler(
  inputCols=["device_temperature"], outputCol="temperatures"
)

assembled = assembler.transform(temperature_value_selection_new)

assembled_new = assembled.withColumn("timestamp", F.current_timestamp())

assembled_new.printSchema()

# scaler = StandardScaler(inputCol="temperatures", outputCol="scaledTemperatures", withStd=True, withMean=False).fit(assembled)

# scaled = scaler.transform(assembled)

summarizer = Summarizer.metrics("max", "min", "variance", "mean", "count")

descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").select(summarizer.summary(assembled_new.temperatures))
#descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp")).orderBy('timestamp', ascending=False).select(summarizer.summary(assembled.temperatures))

#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))

# descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))


# -------------------------------------------------------------------------------------

#########################################
#               QUERIES                 #
#########################################

query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime = "5 seconds").start()#.awaitTermination()

query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime = "8 seconds").start()#.awaitTermination()

query_3= assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime = "11 seconds").start()#.awaitTermination()

#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime = "14 seconds").start()#.awaitTermination()
query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime = "17 seconds").start()#.awaitTermination()

Summarizer documentation.

Based on the posted code, I am isolating the column "temperature_value" and then I vectorize it (using VectorAssembler) to create the column "temperatures" of type vector.

What I would like is to output the result of the "Summarizer" function to my console. This is why I use "append" for outputMode and format "console". But I was getting this error: pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Thus, I used the "withWatermark" function but I am still getting the same error with the outputMode "append".

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

Instant streaming termination:

My questions:

  1. How should I use the "withWatermark" function in order to output the summary statistics of the vector column "temperatures" to my console?

  2. Is there any other approach to calculate descriptive statistics for a custom column of my data frame, which I may miss?

I appreciate any help in advance.

EDIT (20.12.2019)

The solution has been given and accepted. Although, now I get the following error:

解决方案

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

All your streaming queries are up and running, but (the main thread of) the pyspark application does not even give them a chance to run for long (since it does not await any termination due to #.awaitTermination()).

You should block the main thread of the pyspark application using StreamingQuery.awaitTermination(), e.g. query_1.awaitTermination()

这篇关于如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆