在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项 [英] Efficiently calculating weighted rolling average in Pyspark with some caveats

查看:41
本文介绍了在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试计算 Pyspark 中窗口 (partition by id1, id2 ORDER BY unixTime) 上的滚动加权平均值,想知道是否有人对如何执行此操作有想法.

I’m trying to calculate a rolling weighted avg over a window (partition by id1, id2 ORDER BY unixTime) in Pyspark and wanted to know if anyone had ideas on how to do this.

滚动平均值将采用当前行的列值、该列的前 9 行值和该列的后 9 行值,并根据每个值在该行中的方式加权.因此,当前行的权重为 10 倍,滞后 1/领先 1 值的权重为 9 倍.

The rolling avg will take the current row’s value for a column, the 9 previous row values for that column and the 9 following row values for that column and weight each value based on how for it is from the row. So the current row is weighted 10x and the lag 1/lead 1 values are weighted 9x.

如果没有一个值为空,那么加权平均值的分母将为 100.一个警告是,如果有空值,我们仍然要计算移动平均值 (除非略高于 1/2 的值为空).

If none of the values are null, then the denominator for the weighted avg would be 100. The one caveat is that if there are null values, we still want to calculate a moving average (unless a little over 1/2 of the values are null).

因此,例如,如果当前 val 之前的 9 个值为空,则分母将为 55.如果超过 1/2 的值为空,那么我们将输出 NULL 作为加权平均值.我们也可以使用我们说如果分母小于 40 或其他什么的逻辑,输出 null.

So, for example, if the 9 values before the current val are null, the denominator would be 55. IF over 1/2 the values are null, then we would output NULL for the weighted average. We could also use the logic where we say if the denominator is less than 40 or something, output null.

我附上了一个屏幕截图来解释我在说什么,以防它引起混淆,希望这可以解决问题:

I've attached a screenshot to explain what I am saying in case it is confusing, hopefully this clears things up:

我知道我可以在 sql 中执行此操作(并且我可以将数据框保存为临时视图),但是因为我必须为多列执行此滚动平均(完全相同的逻辑),理想情况下如果我可以在Pyspark 我将能够编写一个 for 循环,然后为每一列执行此操作.另外,我很想有效地做到这一点.我读过很多关于滚动平均值的帖子,但我认为这种情况略有不同.

I know I could do this in sql (and I could save the data frame as a temp view), but because I have to do this rolling avg for multiple columns (same exact logic), ideally if I could do it in Pyspark I will be able to write a for loop and then do it for each column. Also, I would love to do this efficiently. I’ve read many threads about rolling averages but think this situation is slightly different.

对不起,如果我过于复杂,希望它是有道理的.如果这不容易有效地做到,我知道如何通过在窗口上列出 lag(val, 10)... lag(val, 9) over window... 等来计算它,并且可以就这样吧.

Sorry if I am overcomplicating this, hopefully it makes sense. If this isn't easy to do efficiently, I do know how to calculate it in sql by listing lag(val, 10) over window... lag(val, 9) over window... etc. and can just go with that.

推荐答案

IIUC,可以尝试的一种方法是使用Window函数collect_list,对列表进行排序,找到当前位置idx使用 array_position(需要 Spark 2.4+),然后基于此计算权重,让我们使用大小为 7(或以下代码中的 N=3)的示例窗口:

IIUC, one way you can try is to use the Window function collect_list, sort the list, find the position idx of the current Row using array_position (require Spark 2.4+) and then calculate the weight based on this, let's use an example Window of size=7 (or N=3 in below code):

from pyspark.sql.functions import expr, sort_array, collect_list, struct
from pyspark.sql import Window

df = spark.createDataFrame([
    (0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77),
    (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)
], ["time", "val"])

N = 3

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

# note that the index for array_position is 1-based, `i` in transform function is 0-based
df1 = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))

df1.show(truncate=False)
+----+----+-------------------------------------------------------------------------+---+----------------------+
|time|val |data                                                                     |idx|weights               |
+----+----+-------------------------------------------------------------------------+---+----------------------+
|0   |0.5 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7]]                                |0  |[10, 9, 8, 7]         |
|1   |0.6 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77]]                     |1  |[9, 10, 9, 8, 7]      |
|2   |0.65|[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8]]           |2  |[8, 9, 10, 9, 8, 7]   |
|3   |0.7 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|4   |0.77|[[1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|5   |0.8 |[[2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|6   |0.7 |[[3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|7   |0.9 |[[4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]          |3  |[7, 8, 9, 10, 9, 8]   |
|8   |0.99|[[5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                     |3  |[7, 8, 9, 10, 9]      |
|9   |0.95|[[6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                               |3  |[7, 8, 9, 10]         |
+----+----+-------------------------------------------------------------------------+---+----------------------+

然后我们可以使用 SparkSQL 内置函数 aggregate 计算权重和加权值的总和:

Then we can use SparkSQL builtin function aggregate to calculate the sum of weights and the weighted values:

N = 9

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

df_new = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))\
    .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
    .withColumn('weighted_val', expr("""
      aggregate(
        zip_with(data,weights, (x,y) -> x.val*y),
        0D, 
        (acc,x) -> acc+x,
        acc -> acc/sum_weights
      )""")) \
    .drop("data", "idx", "sum_weights", "weights")

df_new.show()
+----+----+------------------+
|time| val|      weighted_val|
+----+----+------------------+
|   0| 0.5|0.6827272727272726|
|   1| 0.6|0.7001587301587302|
|   2|0.65|0.7169565217391304|
|   3| 0.7|0.7332876712328767|
|   4|0.77|            0.7492|
|   5| 0.8|0.7641333333333333|
|   6| 0.7|0.7784931506849315|
|   7| 0.9|0.7963768115942028|
|   8|0.99|0.8138095238095238|
|   9|0.95|0.8292727272727273|
+----+----+------------------+

注意事项:

  • 可以通过在计算df_new的第一行设置struct('time','val1','val2')来计算多列,然后调整weighted_val 中的 >idx 和 x.val*y

  • you can calculate multiple columns by setting struct('time','val1', 'val2') in the first line of calculating df_new and then adjust the corresponding calculation of idx and x.val*y in weighted_val etc.

要在无法收集不到一半的值时设置 NULL,请添加 IF(size(data) <= 9, NULL, ...)IF(sum_weights <40, NULL, ...) 语句如下:

to set NULL when less than half values are not able to be collected, add a IF(size(data) <= 9, NULL, ...) or IF(sum_weights < 40, NULL, ...) statement to the following:

  df_new = df.withColumn(...) \
  ...
      .withColumn('weighted_val', expr(""" IF(size(data) <= 9, NULL, 
        aggregate( 
          zip_with(data,weights, (x,y) -> x.val*y), 
          0D,  
          (acc,x) -> acc+x, 
          acc -> acc/sum_weights 
       ))""")) \
      .drop("data", "idx", "sum_weights", "weights")

对于多列,您可以尝试:

for multiple columns, you can try:

cols = ['val1', 'val2', 'val3']

# function to set SQL expression to calculate weighted values for the field `val`
weighted_vals = lambda val: """
    aggregate(
      zip_with(data,weights, (x,y) -> x.{0}*y),
      0D,
      (acc,x) -> acc+x,
      acc -> acc/sum_weights
    ) as weighted_{0}
""".format(val)

df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
  .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
  .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
  .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
  .selectExpr(df.columns + [ weighted_vals(c) for c in cols ])

如果列数有限,我们可以写出SQL表达式来用一个聚合函数计算加权值:

If the # of columns are limited, we can write up the SQL expression to calculate weighted vals with one aggregate function:

df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
  .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
  .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
  .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
  .withColumn("vals", expr(""" 
   aggregate( 
     zip_with(data, weights, (x,y) -> (x.val1*y as val1, x.val2*y as val2)),
     (0D as val1, 0D as val2), 
     (acc,x) -> (acc.val1 + x.val1, acc.val2 + x.val2),
     acc -> (acc.val1/sum_weights as weighted_val1, acc.val2/sum_weights as weighted_val2)
   )     
   """)).select(*df.columns, "vals.*")

这篇关于在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆