用户定义的功能要应用于PySpark中的Window? [英] User defined function to be applied to Window in PySpark?

查看:203
本文介绍了用户定义的功能要应用于PySpark中的Window?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将用户定义的函数应用于PySpark中的Window。我已经读过UDAF可能是要走的路,但是我找不到任何具体的东西。

I am trying to apply a user defined function to Window in PySpark. I have read that UDAF might be the way to to go, but I was not able to find anything concrete.

举个例子(摘自这里: Xinh的技术博客,并针对PySpark进行了修改):

To give an example (taken from here: Xinh's Tech Blog and modified for PySpark):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])

customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
                                    ["Alice", "2016-05-03", 45.00],
                                    ["Alice", "2016-05-04", 55.00],
                                    ["Bob", "2016-05-01", 25.00],
                                    ["Bob", "2016-05-04", 29.00],
                                    ["Bob", "2016-05-06", 27.00]],
                               ["name", "date", "amountSpent"])

customers.show()

window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))

result.show()

我正在申请 avg 已内置到 pyspark.sql.functions 中,但是如果不是 avg 我想使用更复杂的东西并编写自己的函数,该怎么做?

I am applying avg which is already built into pyspark.sql.functions, but if instead of avg I wanted to use something of more complicated and write my own function, how would I do that?

推荐答案

火花> = 3.0

SPARK-24561 -用户定义的带有熊猫udf的窗口函数(绑定了wi ndow)是一项正在进行的工作。有关详细信息,请遵循相关的JIRA。

SPARK-24561 - User-defined window functions with pandas udf (bounded window) is a a work in progress. Please follow the related JIRA for details.

Spark> = 2.4

SPARK-22239 -带有熊猫udf的用户定义的窗口函数(无限窗口)引入了对基于熊猫的具有无限窗口的窗口功能的支持。常规结构为

SPARK-22239 - User-defined window functions with pandas udf (unbounded window) introduced support for Pandas based window functions with unbounded windows. General structure is

return_type: DataType

@pandas_udf(return_type, PandasUDFType.GROUPED_AGG)
def f(v):
  return ... 

w = (Window
    .partitionBy(grouping_column)
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn('foo', f('bar').over(w))

请参阅 doctests 单元测试以获取详细示例。

Please see the doctests and the unit tests for detailed examples.

Spark< 2.4

您不能。窗口函数需要 UserDefinedAggregateFunction 或等效对象,而不是 UserDefinedFunction ,并且无法在PySpark中定义一个。

You cannot. Window functions require UserDefinedAggregateFunction or equivalent object, not UserDefinedFunction, and it is not possible to define one in PySpark.

但是,在PySpark 2.3或更高版本中,您可以定义向量化的 pandas_udf ,该值可以应用于分组数据。您可以找到一个有效的示例在PySpark中的GroupedData上应用UDF(带有有效的python示例)。尽管Pandas不直接提供窗口功能,但它们具有足够的表现力来实现任何类似于窗口的逻辑,尤其是使用 pandas.DataFrame.rolling 。此外,与 GroupedData.apply 一起使用的函数可以返回任意行。

However, in PySpark 2.3 or later, you can define vectorized pandas_udf, which can be applied on grouped data. You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling. Furthermore function used with GroupedData.apply can return arbitrary number of rows.

您还可以从PySpark调用Scala UDAF。 Spark:如何使用Scala或Java用户定义函数映射Python?

You can also call Scala UDAF from PySpark Spark: How to map Python with Scala or Java User Defined Functions?.

这篇关于用户定义的功能要应用于PySpark中的Window?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆