Pandas UDF在PySpark中的改进 [英] Improve Pandas UDF in Pyspark
本文介绍了Pandas UDF在PySpark中的改进的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须执行以下操作:
- 一次考虑100天的数据
- 组按ID的给定列
- 取聚合的最后一个值
- 求和并返回结果
这些任务必须在滑动窗口中使用.rangeBetween(-100 days, 0)
我可以很容易地通过构造一个Pandas UDF来实现这个结果,该UDF接受Pyspark DF的一些列作为输入,将它们转换为Pandas DataFrame,然后计算聚合并返回标量结果。然后将UDF应用于所需的滑动窗口。
尽管此解决方案运行良好,但由于DFS包含数百万行,因此它需要大量时间(3-4小时)才能完成任务。有没有办法改善这类运算的计算时间?我正在使用数据库中的火花源。
我的 pandas UDF是:
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()
并应用于:
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
编辑: 我知道这种聚合可以通过使用NumPy数组来实现,而使用NumPy结构工作时,PySpark UDF要快得多。然而,我想避免这个解决方案,因为我需要在相同的框架函数中应用这些函数,这些函数比所示的函数复杂得多,而且很难用NumPy复制。
推荐答案
我最近不得不实现一个类似的聚合,我的第一次尝试是使用带有滑动窗口的Pandas UDF。性能相当差,我通过使用以下方法设法改进了它。
尝试使用collect_list
组合滑动窗口向量,然后将它们映射到您的UDF。请注意,仅当您的滑动窗口适合工作内存时才有效(通常如此)。
以下是我的测试代码。第一部分只是您的代码,但作为一个完整的可重现示例。
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import FloatType, StructType, StructField, IntegerType, StringType
df = spark.createDataFrame(
[(1, "2021-04-01", 10, -30),
(1, "2021-03-01", 10, 20),
(1, "2021-02-01", 10, -1),
(1, "2021-01-01", 10, 10),
(1, "2020-12-01", 10, 5),
(1, "2021-04-01", 20, -5),
(1, "2021-03-01", 20, -4),
(1, "2021-02-01", 20, -3),
(2, "2021-03-01", 10, 5),
(2, "2021-02-01", 10, 6),
],
StructType([
StructField("csecid", StringType(), True),
StructField("date", StringType(), True),
StructField("analystid", IntegerType(), True),
StructField("revisions_improved", IntegerType(), True)
]))
### Baseline
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
# df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
建议的替代方案:
### Method 3
from typing import List
@udf(FloatType())
def method3(analyst: List[int], revisions: List[int]) -> float:
df = pd.DataFrame({
'analyst': analyst,
'revisions': revisions
})
return float(df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum())
(df
.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))
.withColumn('analyst_win', F.collect_list("analystid").over(w))
.withColumn('revisions_win', F.collect_list("revisions_improved").over(w))
.withColumn('method3', method3(F.collect_list("analystid").over(w),
F.collect_list("revisions_improved").over(w)))
.orderBy("csecid", "date", "analystid")
.show(truncate=False))
结果:
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|csecid|date |analystid|revisions_improved|new_col |analyst_win |revisions_win |method3 |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|1 |2020-12-01|10 |5 |1.0 |[10] |[5] |1.0 |
|1 |2021-01-01|10 |10 |1.0 |[10, 10] |[5, 10] |1.0 |
|1 |2021-02-01|10 |-1 |-1.0 |[10, 10, 10, 20] |[5, 10, -1, -3] |-1.0 |
|1 |2021-02-01|20 |-3 |-1.0 |[10, 10, 10, 20] |[5, 10, -1, -3] |-1.0 |
|1 |2021-03-01|10 |20 |0.6666667|[10, 10, 10, 20, 10, 20] |[5, 10, -1, -3, 20, -4] |0.6666667|
|1 |2021-03-01|20 |-4 |0.6666667|[10, 10, 10, 20, 10, 20] |[5, 10, -1, -3, 20, -4] |0.6666667|
|1 |2021-04-01|10 |-30 |-1.0 |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0 |
|1 |2021-04-01|20 |-5 |-1.0 |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0 |
|2 |2021-02-01|10 |6 |1.0 |[10] |[6] |1.0 |
|2 |2021-03-01|10 |5 |1.0 |[10, 10] |[6, 5] |1.0 |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
analyst_win
和revisions_win
只是为了展示如何创建滑动窗口并将其传递到UDF中。应在生产中删除它们。
repartition
增加分区数,以便每个任务执行较小的数据子集。
这篇关于Pandas UDF在PySpark中的改进的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文