pyspark:使用时间序列数据的滚动平均值 [英] pyspark: rolling average using timeseries data

查看:33
本文介绍了pyspark:使用时间序列数据的滚动平均值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由时间戳列和美元列组成的数据集.我想找到以每行的时间戳结束的每周平均美元数.我最初正在查看 pyspark.sql.functions.window 函数,但它按周对数据进行分箱.

I have a dataset consisting of a timestamp column and a dollars column. I would like to find the average number of dollars per week ending at the timestamp of each row. I was initially looking at the pyspark.sql.functions.window function, but that bins the data by week.

这是一个例子:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

这会产生两条记录:

|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

窗口函数对时间序列数据进行分箱,而不是执行滚动平均.

The window function binned the time series data rather than performing a rolling average.

有没有一种方法可以执行滚动平均值,我将获得每一行的每周平均值,时间段在该行的 timestampGMT 结束?

Is there a way to perform a rolling average where I'll get back a weekly average for each row with a time period ending at the timestampGMT of the row?

下面张的回答接近我想要的,但不完全是我想看到的.

Zhang's answer below is close to what I want, but not exactly what I'd like to see.

这是一个更好的例子来展示我想要得到的东西:

Here's a better example to show what I'm trying to get at:

%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

这会产生以下数据框:

dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15

我希望在timestampGMT列中的日期前一周的平均值,这将导致:

I'd like the average to be over the week proceeding the date in the timestampGMT column, which would result in this:

dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

在上面的结果中,2017-03-10 的 rolling_average 是 17,因为没有前面的记录.2017-03-15 的滚动平均值为 15,因为它是 2017-03-15 的 13 和 2017-03-10 的 17 的平均值,后者落在前 7 天窗口内.2017-03-18 的滚动平均值为 19,因为它是 2017-03-18 的 25 和 2017-03-10 的 13 的平均值,后者落在前 7 天窗口内,并且不包括 2017 的 17-03-10 因为这不属于前 7 天的窗口.

In the above results, the rolling_average for 2017-03-10 is 17, since there are no preceding records. The rolling_average for 2017-03-15 is 15 because it is averaging the 13 from 2017-03-15 and the 17 from 2017-03-10 which falls withing the preceding 7 day window. The rolling average for 2017-03-18 is 19 because it is averaging the 25 from 2017-03-18 and the 13 from 2017-03-10 which falls withing the preceding 7 day window, and it is not including the 17 from 2017-03-10 because that does not fall withing the preceding 7 day window.

有没有办法做到这一点,而不是每周窗口不重叠的分箱窗口?

Is there a way to do this rather than the binning window where the weekly windows don't overlap?

推荐答案

我找到了使用此 stackoverflow 计算移动/滚动平均值的正确方法:

I figured out the correct way to calculate a moving/rolling average using this stackoverflow:

Spark 窗口函数 - 日期范围之间

基本思想是将时间戳列转换为秒,然后您可以使用 pyspark.sql.Window 类中的 rangeBetween 函数在窗口中包含正确的行.

The basic idea is to convert your timestamp column to seconds, and then you can use the rangeBetween function in the pyspark.sql.Window class to include the correct rows in your window.

这是解决的例子:

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))

这导致我正在寻找的滚动平均值的确切列:

This results in the exact column of rolling averages that I was looking for:

dollars   timestampGMT            rolling_average
17        2017-03-10 15:27:18.0   17.0
13        2017-03-15 12:27:18.0   15.0
25        2017-03-18 11:27:18.0   19.0

这篇关于pyspark:使用时间序列数据的滚动平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆