Pandas UDF在PySpark中的改进 [英] Improve Pandas UDF in Pyspark

查看:0
本文介绍了Pandas UDF在PySpark中的改进的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须执行以下操作:

  1. 一次考虑100天的数据
  2. 组按ID的给定列
  3. 取聚合的最后一个值
  4. 求和并返回结果

这些任务必须在滑动窗口中使用.rangeBetween(-100 days, 0)

进行计算

我可以很容易地通过构造一个Pandas UDF来实现这个结果,该UDF接受Pyspark DF的一些列作为输入,将它们转换为Pandas DataFrame,然后计算聚合并返回标量结果。然后将UDF应用于所需的滑动窗口。

尽管此解决方案运行良好,但由于DFS包含数百万行,因此它需要大量时间(3-4小时)才能完成任务。有没有办法改善这类运算的计算时间?我正在使用数据库中的火花源。

我的 pandas UDF是:

@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

并应用于:

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

编辑: 我知道这种聚合可以通过使用NumPy数组来实现,而使用NumPy结构工作时,PySpark UDF要快得多。然而,我想避免这个解决方案,因为我需要在相同的框架函数中应用这些函数,这些函数比所示的函数复杂得多,而且很难用NumPy复制。

推荐答案

我最近不得不实现一个类似的聚合,我的第一次尝试是使用带有滑动窗口的Pandas UDF。性能相当差,我通过使用以下方法设法改进了它。

尝试使用collect_list组合滑动窗口向量,然后将它们映射到您的UDF。请注意,仅当您的滑动窗口适合工作内存时才有效(通常如此)。

以下是我的测试代码。第一部分只是您的代码,但作为一个完整的可重现示例。

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import FloatType, StructType, StructField, IntegerType, StringType

df = spark.createDataFrame(
  [(1, "2021-04-01", 10, -30),
   (1, "2021-03-01", 10, 20),
   (1, "2021-02-01", 10, -1),
   (1, "2021-01-01", 10, 10),
   (1, "2020-12-01", 10, 5),
   (1, "2021-04-01", 20, -5),
   (1, "2021-03-01", 20, -4),
   (1, "2021-02-01", 20, -3),
   (2, "2021-03-01", 10, 5),
   (2, "2021-02-01", 10, 6),
  ], 
  StructType([
    StructField("csecid", StringType(), True), 
    StructField("date", StringType(), True), 
    StructField("analystid", IntegerType(), True), 
    StructField("revisions_improved", IntegerType(), True)
  ]))

### Baseline
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def method2(analyst: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum()

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)

# df.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

建议的替代方案:

### Method 3
from typing import List

@udf(FloatType())
def method3(analyst: List[int], revisions: List[int]) -> float:
  df = pd.DataFrame({
    'analyst': analyst,
    'revisions': revisions
  })
  return float(df.groupby('analyst').last()['revisions'].sum() / df.groupby('analyst').last()['revisions'].abs().sum())

(df
.withColumn('new_col', method2(F.col('analystid'), F.col('revisions_improved')).over(w))

.withColumn('analyst_win', F.collect_list("analystid").over(w))
.withColumn('revisions_win', F.collect_list("revisions_improved").over(w))

.withColumn('method3', method3(F.collect_list("analystid").over(w), 
                               F.collect_list("revisions_improved").over(w)))
.orderBy("csecid", "date", "analystid")
.show(truncate=False))

结果:

+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|csecid|date      |analystid|revisions_improved|new_col  |analyst_win                 |revisions_win                |method3  |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+
|1     |2020-12-01|10       |5                 |1.0      |[10]                        |[5]                          |1.0      |
|1     |2021-01-01|10       |10                |1.0      |[10, 10]                    |[5, 10]                      |1.0      |
|1     |2021-02-01|10       |-1                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-02-01|20       |-3                |-1.0     |[10, 10, 10, 20]            |[5, 10, -1, -3]              |-1.0     |
|1     |2021-03-01|10       |20                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-03-01|20       |-4                |0.6666667|[10, 10, 10, 20, 10, 20]    |[5, 10, -1, -3, 20, -4]      |0.6666667|
|1     |2021-04-01|10       |-30               |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|1     |2021-04-01|20       |-5                |-1.0     |[10, 10, 20, 10, 20, 10, 20]|[10, -1, -3, 20, -4, -30, -5]|-1.0     |
|2     |2021-02-01|10       |6                 |1.0      |[10]                        |[6]                          |1.0      |
|2     |2021-03-01|10       |5                 |1.0      |[10, 10]                    |[6, 5]                       |1.0      |
+------+----------+---------+------------------+---------+----------------------------+-----------------------------+---------+

analyst_winrevisions_win只是为了展示如何创建滑动窗口并将其传递到UDF中。应在生产中删除它们。

将Pandas Groupby移出UDF可能会提高性能。斯帕克可以照顾好这一步。然而,我没有质疑这一部分,因为您提到的函数不能代表实际任务。

查看SparkUI中的性能,特别是应用UDF的任务的时间统计信息。如果时间较长,请尝试使用repartition增加分区数,以便每个任务执行较小的数据子集。

这篇关于Pandas UDF在PySpark中的改进的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆