Pyspark 中的加权移动平均线 [英] Weighted moving average in Pyspark
本文介绍了Pyspark 中的加权移动平均线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在为 Pyspark 中的时间序列编写异常检测算法.我想计算 (-3,3) 或 (-4,4) 窗口的加权移动平均值.现在我正在使用滞后和超前窗函数并将它们乘以一组权重.我的窗口目前是 (-2,2).
I'm writing an anomaly detection algorithm for time series in Pyspark. I want to calculate a weighted moving average of a (-3,3) or (-4,4) window. Right now I am using lag and lead over window functions and multiplying them by a set of weights. My window currently is (-2,2).
我想知道在Pyspark中是否有另一种计算加权移动平均线的方法.
I want to know if there is another way to calculate the weighted moving average in Pyspark.
我使用的当前代码是:
data_frame_1 = spark_data_frame.withColumn("weighted_score_predicted", (weights[0] * lag(column_metric, 1).over(w) + weights[1] * lag(column_metric, 2).over(w) + weights[2] * lead(column_metric, 1).over(w) + weights[3] * lead(column_metric, 2).over(w)) / 2).na.drop()
推荐答案
您可以概括您当前的代码:
You can generalize your current code:
from pyspark.sql.functions import coalesce, lit, col, lead, lag
from operator import add
from functools import reduce
def weighted_average(c, window, offsets, weights):
assert len(weights) == len(offsets)
def value(i):
if i < 0: return lag(c, -i).over(window)
if i > 0: return lead(c, i).over(window)
return c
# Create a list of Columns
# - `value_i * weight_i` if `value_i IS NOT NULL`
# - literal 0 otherwise
values = [coalesce(value(i) * w, lit(0)) for i, w in zip(offsets, weights)]
# or sum(values, lit(0))
return reduce(add, values, lit(0))
它可以用作:
from pyspark.sql.window import Window
df = spark.createDataFrame([
("a", 1, 1.4), ("a", 2, 8.0), ("a", 3, -1.0), ("a", 4, 2.4),
("a", 5, 99.0), ("a", 6, 3.0), ("a", 7, -1.0), ("a", 8, 0.0)
]).toDF("id", "time", "value")
w = Window.partitionBy("id").orderBy("time")
offsets, delays = [-2, -1, 0, 1, 2], [0.1, 0.20, 0.4, 0.20, 0.1]
result = df.withColumn("avg", weighted_average(
col("value"), w, offsets, delays
))
result.show()
## +---+----+-----+-------------------+
## | id|time|value| avg|
## +---+----+-----+-------------------+
## | a| 1| 1.4| 2.06|
## | a| 2| 8.0| 3.5199999999999996|
## | a| 3| -1.0| 11.72|
## | a| 4| 2.4| 21.66|
## | a| 5| 99.0| 40.480000000000004|
## | a| 6| 3.0| 21.04|
## | a| 7| -1.0| 10.1|
## | a| 8| 0.0|0.10000000000000003|
## +---+----+-----+-------------------+
注意:
您可以考虑对缺少滞后的帧的结果进行标准化:
You might consider normalizing the results for frames with missing lags:
result.withColumn(
"normalization_factor",
weighted_average(lit(1), w, offsets, delays)
).withColumn(
"normalized_avg",
col("avg") / col("normalization_factor")
).show()
## +---+----+-----+-------------------+--------------------+------------------+
## | id|time|value| avg|normalization_factor| normalized_avg|
## +---+----+-----+-------------------+--------------------+------------------+
## | a| 1| 1.4| 2.06| 0.7000000000000001|2.9428571428571426|
## | a| 2| 8.0| 3.5199999999999996| 0.9|3.9111111111111105|
## | a| 3| -1.0| 11.72| 1.0000000000000002|11.719999999999999|
## | a| 4| 2.4| 21.66| 1.0000000000000002|21.659999999999997|
## | a| 5| 99.0| 40.480000000000004| 1.0000000000000002| 40.48|
## | a| 6| 3.0| 21.04| 1.0000000000000002|21.039999999999996|
## | a| 7| -1.0| 10.1| 0.9000000000000001| 11.22222222222222|
## | a| 8| 0.0|0.10000000000000003| 0.7000000000000001|0.1428571428571429|
## +---+----+-----+-------------------+--------------------+------------------+
这篇关于Pyspark 中的加权移动平均线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文