如何在PySpark 2.1.0中的事件时间窗口上定义UDAF [英] How to define UDAF over event-time windows in PySpark 2.1.0

查看:179
本文介绍了如何在PySpark 2.1.0中的事件时间窗口上定义UDAF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个Python应用程序,该程序可在带有时间戳的值序列上滑动窗口.我想对滑动窗口中的值应用一个函数,以便根据N个最新值计算分数,如图所示.我们已经使用Python库实现了该功能,以利用GPU.

I'm writing a Python application which slides a window over a sequence of values each with a timestamp. I want to apply a function to values in the sliding window in order to calculate a score from N latest values as shown in the figure. We already implemented that function using a Python library to make use of GPUs.

我发现Apache Spark 2.0附带了结构化流,并且它支持事件时间的窗口操作.如果您想从.csv文件中读取有限的记录序列,并希望在这样的滑动窗口中对记录进行计数,则可以在PySpark中使用以下代码:

I found that Apache Spark 2.0 ships with Structured Streaming and it supports window operations on event time. If you want to read a finite sequence of records from a .csv file and want to count the records in such a sliding window, you can use the following code in PySpark:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'}) 

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

但是,我想在滑动窗口上应用除预定义聚合功能以外的UDAF.根据 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg ,可用的汇总函数仅为avg,最大,最小,总计和计数.

However, I want to apply UDAFs other than predefined aggregation functions over sliding windows. According to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg, the available aggregate functions are only avg, max, min, sum, and count.

尚不支持?如果是这样,何时在PySpark中支持它?

It is not supported yet? If so, when will it be supported in PySpark?

https://stackoverflow.com/a/32750733/1564381 表明,可以在Java或Scala中定义UserDefinedAggregateFunction然后在PySpark中调用它.看起来很有趣,但是我想将自己的Python函数应用于滑动窗口中的值.我想要一种纯Pythonic的方式.

https://stackoverflow.com/a/32750733/1564381 shows that one can define UserDefinedAggregateFunction in Java or Scala and then invoke it in PySpark. It seems interesting but I want to apply my own Python function over values in sliding windows. I want a purely Pythonic way.

p.s.让我知道除PySpark以外的Python中可以解决这类问题的任何框架(将UDAF应用于流上的窗口).

p.s. let me know any frameworks in Python other than PySpark that can solve this sort of problems (applying UDAFs on a window sliding over stream).

推荐答案

在Spark< 2.3中,您不能执行此操作.

In Spark <2.3, you cannot do this.

对于Spark> = 2.3,这适用于分组数据,但不适用于使用"PySpark UDAF和Pandas"的Windows.

For Spark >= 2.3, this is possible for Grouped data, but not yet for Windows using "PySpark UDAFs with Pandas".

当前,PySpark无法在Windows上运行UserDefined函数.

Currently, PySpark cannot run UserDefined functions on Windows.

以下是对此的详细描述的问题:在PySpark中的GroupedData上应用UDF(带有可运行的python示例)

Here is a well described SO question on this: Applying UDFs on GroupedData in PySpark (with functioning python example)

这是添加了此功能的JIRA票证- https://issues.apache .org/jira/browse/SPARK-10915

Here is the JIRA ticket that added this feature - https://issues.apache.org/jira/browse/SPARK-10915

这篇关于如何在PySpark 2.1.0中的事件时间窗口上定义UDAF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆