Pyspark中时间戳的滚动平均值和天数之和 [英] Rolling average and sum by days over timestamp in Pyspark

查看:67
本文介绍了Pyspark中时间戳的滚动平均值和天数之和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个PySpark数据框,其中时间戳以天为单位.以下是数据框的示例(我们称其为 df ):

I have a PySpark dataframe where the timestamp is in units of days. Following is an example of the dataframe (let's call it df):

+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10|   11|
|name1|type1|2012-01-11|   14|
|name1|type1|2012-01-12|    2|
|name1|type3|2012-01-12|    3|
|name1|type3|2012-01-11|   55|
|name1|type1|2012-01-13|   10|
|name1|type2|2012-01-14|   11|
|name1|type2|2012-01-15|   14|
|name2|type2|2012-01-10|    2|
|name2|type2|2012-01-11|    3|
|name2|type2|2012-01-12|   55|
|name2|type1|2012-01-10|   10|
|name2|type1|2012-01-13|   55|
|name2|type1|2012-01-14|   10|
+-----+-----+----------+-----+

在此数据框中,我想求平均值,并在三天的滚动时间范围内获取不同名称的得分总和.意思是,对于数据框的任何给定日期,找到 name1 的当天,所考虑的日期的前一天和所考虑的日期的前一天的分数总和.并整天对 name1 执行类似的操作.并且还对各种 names viz. name2 等进行相同的练习.我该怎么做?

In this dataframe, I want to average over, and take sum of scores for different names over a rolling time window of three days. Meaning, for any given day of the data frame, and find sum of scores on that day, the day before the considered day, and the day before the day before the considered day for a name1 . And do similar things for all days of name1. And also do same exercises for all kinds of names , viz. name2 etc. How can I do this?

我查看了这篇帖子,并尝试了以下

I took a look at this post, and tried the following

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

days = lambda i: i*1

w_rolling = Window.orderBy(F.col("timestamp").cast("long")).rangeBetween(-days(3), 0)
df_agg = df.withColumn("rolling_average", F.avg("score").over(w_rolling)).withColumn(
    "rolling_sum", F.sum("score").over(w_rolling)
)
df_agg.show()

+-----+-----+----------+-----+------------------+-----------+
| name| type| timestamp|score|   rolling_average|rolling_sum|
+-----+-----+----------+-----+------------------+-----------+
|name1|type1|2012-01-10|   11|18.214285714285715|        255|
|name1|type1|2012-01-11|   14|18.214285714285715|        255|
|name1|type1|2012-01-12|    2|18.214285714285715|        255|
|name1|type3|2012-01-12|    3|18.214285714285715|        255|
|name1|type3|2012-01-11|   55|18.214285714285715|        255|
|name1|type1|2012-01-13|   10|18.214285714285715|        255|
|name1|type2|2012-01-14|   11|18.214285714285715|        255|
|name1|type2|2012-01-15|   14|18.214285714285715|        255|
|name2|type2|2012-01-10|    2|18.214285714285715|        255|
|name2|type2|2012-01-11|    3|18.214285714285715|        255|
|name2|type2|2012-01-12|   55|18.214285714285715|        255|
|name2|type1|2012-01-10|   10|18.214285714285715|        255|
|name2|type1|2012-01-13|   55|18.214285714285715|        255|
|name2|type1|2012-01-14|   10|18.214285714285715|        255|
+-----+-----+----------+-----+------------------+-----------+

正如您所看到的,我总是得到相同的滚动平均值和滚动总和,这不过是整天 score 列的平均值和总和.这不是我想要的.

As you see, I always get the same rolling average and rolling sum which is nothing but the average and sum of the column score for all days. This is not what I want.

您可以使用以下代码段创建上述数据框:

You can create the above-mentioned dataframe using the following code snippet:


df_Stats = Row("name", "type", "timestamp", "score")

df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)

df_stat_lst = [
    df_stat1,
    df_stat2,
    df_stat3,
    df_stat4,
    df_stat5,
    df_stat6,
    df_stat7,
    df_stat8,
    df_stat9,
    df_stat10,
    df_stat11,
    df_stat12,
    df_stat13,
    df_stat14
]

df = spark.createDataFrame(df_stat_lst)

推荐答案

您可以使用下面的代码来计算最近3天(包括当天)的总和和平均值.

You can use below code to calculate the sum and average of score over last 3 days including current day.

# Considering the dataframe already created using code provided in question
df = df.withColumn('unix_time', F.unix_timestamp('timestamp', 'yyyy-MM-dd'))

winSpec = Window.partitionBy('name').orderBy('unix_time').rangeBetween(-2*86400, 0)

df = df.withColumn('rolling_sum', F.sum('score').over(winSpec))
df = df.withColumn('rolling_avg', F.avg('score').over(winSpec))

df.orderBy('name', 'timestamp').show(20, False)

+-----+-----+----------+-----+----------+-----------+------------------+
|name |type |timestamp |score|unix_time |rolling_sum|rolling_avg       |
+-----+-----+----------+-----+----------+-----------+------------------+
|name1|type1|2012-01-10|11   |1326153600|11         |11.0              |
|name1|type3|2012-01-11|55   |1326240000|80         |26.666666666666668|
|name1|type1|2012-01-11|14   |1326240000|80         |26.666666666666668|
|name1|type1|2012-01-12|2    |1326326400|85         |17.0              |
|name1|type3|2012-01-12|3    |1326326400|85         |17.0              |
|name1|type1|2012-01-13|10   |1326412800|84         |16.8              |
|name1|type2|2012-01-14|11   |1326499200|26         |6.5               |
|name1|type2|2012-01-15|14   |1326585600|35         |11.666666666666666|
|name2|type1|2012-01-10|10   |1326153600|12         |6.0               |
|name2|type2|2012-01-10|2    |1326153600|12         |6.0               |
+-----+-----+----------+-----+----------+-----------+------------------+

这篇关于Pyspark中时间戳的滚动平均值和天数之和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆