如何在 PySpark 2.1.0 中定义事件时间窗口上的 UDAF [英] How to define UDAF over event-time windows in PySpark 2.1.0

查看:36
本文介绍了如何在 PySpark 2.1.0 中定义事件时间窗口上的 UDAF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 Python 应用程序,它在一系列值上滑动一个窗口,每个值都带有时间戳.我想对滑动窗口中的值应用一个函数,以便从 N 个最新值计算分数,如图所示.我们已经使用 Python 库实现了该功能以利用 GPU.

I'm writing a Python application which slides a window over a sequence of values each with a timestamp. I want to apply a function to values in the sliding window in order to calculate a score from N latest values as shown in the figure. We already implemented that function using a Python library to make use of GPUs.

我发现 Apache Spark 2.0 附带结构化流,它支持事件时间的窗口操作.如果你想从一个.csv文件中读取一个有限的记录序列,并且想对这样一个滑动窗口中的记录进行计数,你可以在PySpark中使用以下代码:

I found that Apache Spark 2.0 ships with Structured Streaming and it supports window operations on event time. If you want to read a finite sequence of records from a .csv file and want to count the records in such a sliding window, you can use the following code in PySpark:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'}) 

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

但是,我想在滑动窗口上应用除预定义聚合函数之外的 UDAF.根据 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有avg、max、最小值、总和和计数.

However, I want to apply UDAFs other than predefined aggregation functions over sliding windows. According to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg, the available aggregate functions are only avg, max, min, sum, and count.

还不支持?如果是,PySpark 什么时候会支持?

It is not supported yet? If so, when will it be supported in PySpark?

https://stackoverflow.com/a/32750733/1564381 表明可以在 Java 或 Scala 中定义 UserDefinedAggregateFunction然后在 PySpark 中调用它.看起来很有趣,但我想将我自己的 Python 函数应用于滑动窗口中的值.我想要一种纯粹的 Pythonic 方式.

https://stackoverflow.com/a/32750733/1564381 shows that one can define UserDefinedAggregateFunction in Java or Scala and then invoke it in PySpark. It seems interesting but I want to apply my own Python function over values in sliding windows. I want a purely Pythonic way.

附言让我知道除 PySpark 之外的 Python 中可以解决此类问题的任何框架(在流上滑动的窗口上应用 UDAF).

p.s. let me know any frameworks in Python other than PySpark that can solve this sort of problems (applying UDAFs on a window sliding over stream).

推荐答案

在 Spark <2.3 中,您不能这样做.

In Spark <2.3, you cannot do this.

对于 Spark >= 2.3,这适用于分组数据,但不适用于使用PySpark UDAFs with Pandas"的 Windows.

For Spark >= 2.3, this is possible for Grouped data, but not yet for Windows using "PySpark UDAFs with Pandas".

目前,PySpark 无法在 Windows 上运行 UserDefined 函数.

Currently, PySpark cannot run UserDefined functions on Windows.

这里有一个很好描述的 SO 问题:在 PySpark 中对 GroupedData 应用 UDF(使用 Python 功能示例)

Here is a well described SO question on this: Applying UDFs on GroupedData in PySpark (with functioning python example)

这是添加此功能的 JIRA 票证 - https://issues.apache.org/jira/browse/SPARK-10915

Here is the JIRA ticket that added this feature - https://issues.apache.org/jira/browse/SPARK-10915

这篇关于如何在 PySpark 2.1.0 中定义事件时间窗口上的 UDAF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆