如何计算Spark结构化流中的滞后差异? [英] How to calculate lag difference in Spark Structured Streaming?
问题描述
我正在编写一个Spark结构化流程序.我需要创建一个带有滞后差的附加列.
I am writing a Spark Structured Streaming program. I need to create an additional column with the lag difference.
为重现我的问题,我提供了代码段.此代码使用存储在data
文件夹中的data.json
文件:
To reproduce my issue, I provide the code snippet. This code consumes data.json
file stored in data
folder:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[2]") \
.getOrCreate()
schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])
ds = spark \
.readStream \
.format("json") \
.schema(schema) \
.load("data/")
diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))
query = ds \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
我收到此错误:
pyspark.sql.utils.AnalysisException:非基于时间的窗口不是 在流式数据帧/数据集上受支持;; \ nWindow [lag(timestamp#71L,1,null)windowspecdefinition(host_id#68, timestamp#71L ASC首先为空,行在1个前置字符和1个之间 PRECEDING)为prev_timestamp#129L]
pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS prev_timestamp#129L]
推荐答案
pyspark.sql.utils.AnalysisException:流数据帧/数据集不支持基于非时间的窗口
pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets
表示您的窗口应基于timestamp
列.因此,您每秒都有一个数据点,并且创建一个30s
窗口,且stride
为10s
,结果窗口将创建一个新的window
列,其中包含start
和end
列其中将包含时差为30s
的时间戳.
Meaning that your window should be based on a timestamp
column. So it you have a data point for each second, and you make a 30s
window with a stride
of 10s
, your resultant window would create a new window
column, with start
and end
columns which will contain timestamps with a difference of 30s
.
您应该以这种方式使用窗口:
You should use the window in this way:
words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))
w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
.withWatermark('date_format', '1 minutes') \
.groupBy(w).agg(F.mean('value'))
这篇关于如何计算Spark结构化流中的滞后差异?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!