如何计算 Spark Structured Streaming 中的滞后差异? [英] How to calculate lag difference in Spark Structured Streaming?

查看:29
本文介绍了如何计算 Spark Structured Streaming 中的滞后差异?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 Spark Structured Streaming 程序.我需要创建一个具有滞后差异的附加列.

为了重现我的问题,我提供了代码片段.此代码使用存储在 data 文件夹中的 data.json 文件:

<预><代码>[{"id": 77,"type": "person","timestamp": 1532609003},{"id": 77,"type": "person","timestamp": 1532609005},{"id": 78,"type": "crane","timestamp": 1532609005}]

代码:

from pyspark.sql import SparkSession导入 pyspark.sql.functions 作为 func从 pyspark.sql.window 导入窗口从 pyspark.sql.types 导入 *火花 = SparkSession \.builder \.appName("测试") \.master("本地[2]") \.getOrCreate()架构 = 结构类型([StructField("id", IntegerType()),StructField("type", StringType()),StructField("时间戳", LongType())])ds = 火花 \.readStream \.format("json") \.schema(schema) \.load("数据/")diff_window = Window.partitionBy("id").orderBy("timestamp")ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))查询 = ds \.writeStream \.format('控制台') \.开始()query.awaitTermination()

我收到此错误:

<块引用>

pyspark.sql.utils.AnalysisException: u'非基于时间的窗口不是支持流式数据帧/数据集;;\n窗口[滞后(时间戳#71L,1,空)windowspecdefinition(host_id#68,时间戳#71L ASC NULLS FIRST,1 行与 1 行之间PRECEDING) AS prev_timestamp#129L]

解决方案

pyspark.sql.utils.AnalysisException:流式数据帧/数据集不支持 u'非基于时间的窗口

意味着您的窗口应该基于 timestamp 列.因此,如果您每秒有一个数据点,并且您创建一个 30s 窗口,stride10s,则结果窗口将创建一个新的 window 列,带有 startend 列,它们将包含相差 30s 的时间戳.

你应该这样使用窗口:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))w = F.window('date_time', '30 秒', '10 秒')单词 = 单词 \.withWatermark('date_format', '1 分钟') \.groupBy(w).agg(F.mean('value'))

I am writing a Spark Structured Streaming program. I need to create an additional column with the lag difference.

To reproduce my issue, I provide the code snippet. This code consumes data.json file stored in data folder:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]

Code:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[2]") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType())
])

ds = spark \
    .readStream \
    .format("json") \
    .schema(schema) \
    .load("data/")

diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))

query = ds \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

I get this error:

pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS prev_timestamp#129L]

解决方案

pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets

Meaning that your window should be based on a timestamp column. So it you have a data point for each second, and you make a 30s window with a stride of 10s, your resultant window would create a new window column, with start and end columns which will contain timestamps with a difference of 30s.

You should use the window in this way:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))

w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
   .withWatermark('date_format', '1 minutes') \
   .groupBy(w).agg(F.mean('value'))

这篇关于如何计算 Spark Structured Streaming 中的滞后差异?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆