带有一些警告的高效计算Pyspark的加权滚动平均值 [英] Efficiently calculating weighted rolling average in Pyspark with some caveats

查看:215
本文介绍了带有一些警告的高效计算Pyspark的加权滚动平均值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Pyspark中的窗口(按id1,id2 ORDER BY unixTime划分)上计算滚动加权平均值,并想知道是否有人对此有想法./p>

滚动平均将获取一列的当前行值,该列的前9个行值和该列的9个后行值,并根据行中的值对每个值加权.因此,当前行的权重为10x,滞后1/超前1值的权重为9x.

如果所有值都不为空,则加权平均数的分母为100.一个警告是,如果存在空值,我们仍要计算移动平均值(除非略大于1)值的/2为空).

因此,例如,如果当前val之前的9个值为空,则分母为55.如果超过1/2,则分母为空,那么对于加权平均值,我们将输出NULL.我们也可以使用说分母小于40的逻辑输出空值.

我已经附上了一张屏幕截图,以解释我在混淆时要说的内容,希望这可以清除一切:

我知道我可以在sql中执行此操作(并且可以将数据帧另存为临时视图),但是因为我必须对多个列(相同的精确逻辑)执行此滚动avg,理想情况下,如果我可以在Pyspark我将能够编写一个for循环,然后为每个列执行此操作.另外,我很想有效地做到这一点.我读过许多有关滚动平均值的主题,但认为这种情况略有不同.

对不起,如果我把这个复杂化了,希望这是有道理的.如果这不容易有效地实现,我确实知道如何在SQL中通过在窗口上方列出lag(val,10)...在窗口上方列出lag(val,9)等来进行计算,并且可以随便去吧.

解决方案

IIUC,您可以尝试的一种方法是使用Window函数collect_list,对列表进行排序,找到当前位置的 idx 使用 array_position (需要Spark 2.4 + ),然后基于此计算重量,让我们使用一个示例,该窗口的大小为7(在下面的代码中为N = 3):

来自pyspark.sql.functions的

 导入expr,sort_array,collect_list,struct从pyspark.sql导入窗口df = spark.createDataFrame([(0,0.5),(1,0.6),(2,0.65),(3,0.7),(4,0.77),(5,0.8),(6,0.7),(7,0.9),(8,0.99),(9,0.95)],[时间","val"])N = 3w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)#注意array_position的索引是从1开始的,变换函数中的"i"是从0开始的df1 = df.withColumn('data',sort_array(collect_list(struct('time','val')).over(w1)))\.withColumn('idx',expr("array_position(data,(time,val))-1")))\.withColumn('weights',expr("transform(data,(x,i)-> 10-abs(i-idx))")))df1.show(truncate = False)+ ---- + ---- + ------------------------------------------------------------------------- + --- + ---------------------- +| time | val |数据| idx |权重|+ ---- + ---- + ------------------------------------------------------------------------- + --- + ---------------------- +| 0 | 0.5 | [[0,0.5],[1,0.6],[2,0.65],[3,0.7]] | 0 | [10,9,8,7] || 1 | 0.6 | [[[0,0.5],[1,0.6],[2,0.65],[3,0.7],[4,0.77]] | 1 | [9,10,9,8,7]|| 2 | 0.65 | [[[0,0.5],[1,0.6],[2,0.65],[3,0.7],[4,0.77],[5,0.8]] | 2 | [8,9,10、9、8、7] || 3 | 0.7 | [[[0,0.5],[1,0.6],[2,0.65],[3,0.7],[4,0.77],[5,0.8],[6,0.7]] | 3| [7、8、9、10、9、8、7] || 4 | 0.77 | [[[1,0.6],[2,0.65],[3,0.7],[4,0.77],[5,0.8],[6,0.7],[7,0.9]] | 3| [7、8、9、10、9、8、7] || 5 | 0.8 | [[[2,0.65],[3,0.7],[4,0.77],[5,0.8],[6,0.7],[7,0.9],[8,0.99]] | 3| [7、8、9、10、9、8、7] || 6 | 0.7 | [[3,0.7],[4,0.77],[5,0.8],[6,0.7],[7,0.9],[8,0.99],[9,0.95]] | 3| [7、8、9、10、9、8、7] || 7 | 0.9 | [[[4,0.77],[5,0.8],[6,0.7],[7,0.9],[8,0.99],[9,0.95]] | 3 | [7,8,9,10,9,8] || 8 | 0.99 | [[5,0.8],[6,0.7],[7,0.9],[8,0.99],[9,0.95]] | 3 | [7,8,9,10,9]|| 9 | 0.95 | [[6,0.7],[7,0.9],[8,0.99],[9,0.95]] | 3 | [7,8,9,10] |+ ---- + ---- + ------------------------------------------------------------------------- + --- + ---------------------- + 

然后我们可以使用SparkSQL内置函数聚合计算权重和加权值的总和:

  N = 9w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)df_new = df.withColumn('data',sort_array(collect_list(struct('time','val')).over(w1)))\.withColumn('idx',expr("array_position(data,(time,val))-1")))\.withColumn('weights',expr("transform(data,(x,i)-> 10-abs(i-idx))"))\.withColumn('sum_weights',expr("aggregate(weights,0D,(acc,x)-> acc + x)"))\.withColumn('weighted_val',expr(总计的(zip_with(数据,权重,(x,y)-> x.val * y),0D,(acc,x)->acc + x,acc->acc/sum_weights)"))\.drop(数据","idx","sum_weights","weights")df_new.show()+ ---- + ---- + ------------------ +|时间|val |weighted_val |+ ---- + ---- + ------------------ +|0 |0.5 | 0.6827272727272726 ||1 |0.6 | 0.7001587301587302 ||2 | 0.65 | 0.7169565217391304 ||3 |0.7 | 0.7332876712328767 ||4 | 0.77 |0.7492 ||5 |0.8 | 0.7641333333333333 ||6 |0.7 | 0.7784931506849315 ||7 |0.9 | 0.7963768115942028 ||8 | 0.99 | 0.8138095238095238 ||9 | 0.95 | 0.8292727272727273 |+ ---- + ---- + ------------------ + 

注释:

  • 您可以通过在计算df_new的第一行中设置 struct('time','val1','val2')来计算多列,然后调整相应的 weighted_val 等中> idx 和 x.val * y 等.

  • 要在无法收集少于一半的值时将其设置为NULL,请添加 IF(size(data)< = 9,NULL,...) IF(sum_weights< 40,NULL,...)语句如下:

      df_new = df.withColumn(...)\....withColumn('weighted_val',expr(''" IF(size(data)< == 9,NULL,总计的(zip_with(数据,权重,(x,y)-> x.val * y),0D,(acc,x)->acc + x,acc->acc/sum_weights))""))\.drop(数据","idx","sum_weights","weights") 

,对于多列,您可以尝试:

  cols = ['val1','val2','val3']#函数设置SQL表达式以计算字段val的加权值weighted_vals = lambda val:"总计的(zip_with(数据,权重,(x,y)-> x.{0} * y),0D,(acc,x)->acc + x,acc->acc/sum_weights)作为weighted_ {0}.".format(val)df_new = df.withColumn('data',sort_array(collect_list(struct('time',* cols)).over(w1)))\.withColumn('idx',expr("array_position(data,(time,{})))-1" .format(','.join(cols))))\.withColumn('weights',expr("transform(data,(x,i)-> 10-abs(i-idx))"))\.withColumn('sum_weights',expr("aggregate(weights,0D,(acc,x)-> acc + x)"))\.selectExpr(df.columns + [cols中的c的weighted_vals(c)]) 

如果列数受到限制,我们可以编写一个SQL表达式以使用一个聚合函数来计算加权val:

  df_new = df.withColumn('data',sort_array(collect_list(struct('time',* cols)).over(w1)))\.withColumn('idx',expr("array_position(data,(time,{})))-1" .format(','.join(cols))))\.withColumn('weights',expr("transform(data,(x,i)-> 10-abs(i-idx))"))\.withColumn('sum_weights',expr("aggregate(weights,0D,(acc,x)-> acc + x)"))\.withColumn("vals",expr(''")总计的(zip_with(数据,权重,(x,y)->(x.val1 * y为val1,x.val2 * y为val2)),(0D为val1,0D为val2),(acc,x)->(acc.val1 + x.val1,acc.val2 + x.val2),acc->(acc.val1/sum_weights为weighted_val1,acc.val2/sum_weights为weighted_val2))("))).select(* df.columns,"vals.*") 

I’m trying to calculate a rolling weighted avg over a window (partition by id1, id2 ORDER BY unixTime) in Pyspark and wanted to know if anyone had ideas on how to do this.

The rolling avg will take the current row’s value for a column, the 9 previous row values for that column and the 9 following row values for that column and weight each value based on how for it is from the row. So the current row is weighted 10x and the lag 1/lead 1 values are weighted 9x.

If none of the values are null, then the denominator for the weighted avg would be 100. The one caveat is that if there are null values, we still want to calculate a moving average (unless a little over 1/2 of the values are null).

So, for example, if the 9 values before the current val are null, the denominator would be 55. IF over 1/2 the values are null, then we would output NULL for the weighted average. We could also use the logic where we say if the denominator is less than 40 or something, output null.

I've attached a screenshot to explain what I am saying in case it is confusing, hopefully this clears things up:

I know I could do this in sql (and I could save the data frame as a temp view), but because I have to do this rolling avg for multiple columns (same exact logic), ideally if I could do it in Pyspark I will be able to write a for loop and then do it for each column. Also, I would love to do this efficiently. I’ve read many threads about rolling averages but think this situation is slightly different.

Sorry if I am overcomplicating this, hopefully it makes sense. If this isn't easy to do efficiently, I do know how to calculate it in sql by listing lag(val, 10) over window... lag(val, 9) over window... etc. and can just go with that.

解决方案

IIUC, one way you can try is to use the Window function collect_list, sort the list, find the position idx of the current Row using array_position (require Spark 2.4+) and then calculate the weight based on this, let's use an example Window of size=7 (or N=3 in below code):

from pyspark.sql.functions import expr, sort_array, collect_list, struct
from pyspark.sql import Window

df = spark.createDataFrame([
    (0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77),
    (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)
], ["time", "val"])

N = 3

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

# note that the index for array_position is 1-based, `i` in transform function is 0-based
df1 = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))

df1.show(truncate=False)
+----+----+-------------------------------------------------------------------------+---+----------------------+
|time|val |data                                                                     |idx|weights               |
+----+----+-------------------------------------------------------------------------+---+----------------------+
|0   |0.5 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7]]                                |0  |[10, 9, 8, 7]         |
|1   |0.6 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77]]                     |1  |[9, 10, 9, 8, 7]      |
|2   |0.65|[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8]]           |2  |[8, 9, 10, 9, 8, 7]   |
|3   |0.7 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|4   |0.77|[[1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|5   |0.8 |[[2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|6   |0.7 |[[3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|7   |0.9 |[[4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]          |3  |[7, 8, 9, 10, 9, 8]   |
|8   |0.99|[[5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                     |3  |[7, 8, 9, 10, 9]      |
|9   |0.95|[[6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                               |3  |[7, 8, 9, 10]         |
+----+----+-------------------------------------------------------------------------+---+----------------------+

Then we can use SparkSQL builtin function aggregate to calculate the sum of weights and the weighted values:

N = 9

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

df_new = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))\
    .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
    .withColumn('weighted_val', expr("""
      aggregate(
        zip_with(data,weights, (x,y) -> x.val*y),
        0D, 
        (acc,x) -> acc+x,
        acc -> acc/sum_weights
      )""")) \
    .drop("data", "idx", "sum_weights", "weights")

df_new.show()
+----+----+------------------+
|time| val|      weighted_val|
+----+----+------------------+
|   0| 0.5|0.6827272727272726|
|   1| 0.6|0.7001587301587302|
|   2|0.65|0.7169565217391304|
|   3| 0.7|0.7332876712328767|
|   4|0.77|            0.7492|
|   5| 0.8|0.7641333333333333|
|   6| 0.7|0.7784931506849315|
|   7| 0.9|0.7963768115942028|
|   8|0.99|0.8138095238095238|
|   9|0.95|0.8292727272727273|
+----+----+------------------+

Notes:

  • you can calculate multiple columns by setting struct('time','val1', 'val2') in the first line of calculating df_new and then adjust the corresponding calculation of idx and x.val*y in weighted_val etc.

  • to set NULL when less than half values are not able to be collected, add a IF(size(data) <= 9, NULL, ...) or IF(sum_weights < 40, NULL, ...) statement to the following:

      df_new = df.withColumn(...) \
      ...
          .withColumn('weighted_val', expr(""" IF(size(data) <= 9, NULL, 
            aggregate( 
              zip_with(data,weights, (x,y) -> x.val*y), 
              0D,  
              (acc,x) -> acc+x, 
              acc -> acc/sum_weights 
           ))""")) \
          .drop("data", "idx", "sum_weights", "weights")
    

EDIT: for multiple columns, you can try:

cols = ['val1', 'val2', 'val3']

# function to set SQL expression to calculate weighted values for the field `val`
weighted_vals = lambda val: """
    aggregate(
      zip_with(data,weights, (x,y) -> x.{0}*y),
      0D,
      (acc,x) -> acc+x,
      acc -> acc/sum_weights
    ) as weighted_{0}
""".format(val)

df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
  .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
  .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
  .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
  .selectExpr(df.columns + [ weighted_vals(c) for c in cols ])

If the # of columns are limited, we can write up the SQL expression to calculate weighted vals with one aggregate function:

df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
  .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
  .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
  .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
  .withColumn("vals", expr(""" 
   aggregate( 
     zip_with(data, weights, (x,y) -> (x.val1*y as val1, x.val2*y as val2)),
     (0D as val1, 0D as val2), 
     (acc,x) -> (acc.val1 + x.val1, acc.val2 + x.val2),
     acc -> (acc.val1/sum_weights as weighted_val1, acc.val2/sum_weights as weighted_val2)
   )     
   """)).select(*df.columns, "vals.*")

这篇关于带有一些警告的高效计算Pyspark的加权滚动平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆