火花指数移动平均线 [英] Spark Exponential Moving Average

查看:85
本文介绍了火花指数移动平均线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个时间序列定价数据的数据框,其中包含ID,日期和价格.

I have a dataframe of timeseries pricing data, with an ID, Date and Price.

我需要计算价格"列的指数移动平均线,并将其作为新列添加到数据框中.

I need to compute the Exponential Moving Average for the Price Column, and add it as a new column to the dataframe.

我以前一直在使用Spark的窗口函数,它看起来很适合这种用例,但是给出了EMA的公式:

I have been using Spark's window functions before, and it looked like a fit for this use case, but given the formula for the EMA:

EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)

其中

multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now

我实际上如何在列上进行窗口浏览时,对如何访问列中的先前计算值感到有些困惑. 使用简单的移动平均线就很简单,因为您要做的就是在计算窗口中元素的平均时计算新列:

I got a bit confused as to how can I access to the previous computed value in the column, while actually window-ing over the column. With a simple moving average, it's simple, since all you need to do is compute a new column while averaging the elements in the window:

var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

但是使用EMA似乎更加复杂,因为在每一步中我都需要先前的计算值.

But it seems that with EMA its a bit more complicated since at every step I need the previous computed value.

我还查看了 Pyspark中的加权移动平均值,但我需要一种方法适用于Spark/Scala,以及10或30天的EMA.

I have also looked at Weighted moving average in Pyspark but I need an approach for Spark/Scala, and for a 10 or 30 days EMA.

有什么想法吗?

推荐答案

最后,我分析了如何在熊猫数据框中实现指数移动平均值.除了我上面描述的递归公式,该递归公式很难在任何sql或window函数中实现(因为其递归),还有另一个递归公式,详细说明

In the end, I've analysed how exponential moving average is implemented in pandas dataframes. Besides the recursive formula which I described above, and which is difficult to implement in any sql or window function(because its recursive), there is another one, which is detailed on their issue tracker:

y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
       ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).

鉴于此,并通过

Given this, and with additional spark implementation help from here, I ended up with the following implementation, which is roughly equivalent with doing pandas_dataframe.ewm(span=window_size).mean().

def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
  val window = Window.partitionBy(partitionColumn)
  val exponentialMovingAveragePrefix = "_EMA_"

  val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
    val alpha = 2.0 / (windowSize + 1)
    val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
      accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
    }
    (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
  })
  dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
    .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
    .drop("row_nr")
}

(我假设需要为其计算指数移动平均值的列的类型为Double.)

(I am presuming the type of the column for which I need to compute the exponential moving average is Double.)

我希望这对其他人有帮助.

I hope this helps others.

这篇关于火花指数移动平均线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆