如何在pyspark数据帧(时间序列)中计算每日基础 [英] How to calculate daily basis in pyspark dataframe (time series)
问题描述
所以我有一个数据框,我想计算一些数量,比如每天..假设我们有 10 列 col1,col2,col3,col4... coln,其中每一列都依赖于值 col1
, col2, col3 , col4.. 等等,日期根据 id
..
+--------+----+---- +----+日期 |col1|id |col2|..|冒号+--------+----+---- +----+2020-08-01|0|M1 |...3|2020-08-02|4|M1 |10|2020-08-03|3|M1 |...9 |2020-08-04|2|M1 |...8 |2020-08-05|1|M1 |...7 |2020-08-06|0|M1 |...0 |2020-08-01|0|M2 |...0 |2020-08-02|0|M2 |....1 |2020-08-03|0|M2 |....2 |+---------+----+----+-----------------+
假设我们执行此数据框,此 df 中可能有更多列...为了说明这一点,假设今天的日期是 2020-08-01.我们做了一些计算,我们在 coln 得到了一些输出,比如说 coln =3
在 2020-08-01,我想在 2020-08-02 coln == col1,即 col1 ==3 并在 2020-08-02 等进行计算...所以 df 的示例如下所示
+--------+----+---- +----+日期 |col1|id |col2|..|冒号+--------+----+---- +----+2020-08-01|0|M1 |...3|2020-08-02|3|M1 |10|2020-08-03|10|M1 |...9 |2020-08-04|9|M1 |...8 |2020-08-05|8|M1 |...7 |2020-08-06|7|M1 |...0 |2020-08-01|0|M2 |...1 |2020-08-02|1|M2 |....2 |2020-08-03|2|M2 |....0 |+---------+----+----+-----------------+
如果你们能给我一个如何在 pyspark 中做到这一点的例子,那就太好了.
示例:假设 col3 = col1+ col2
并且最初假设 col1 全为 0.
df1_schema = StructType([StructField(Date", StringType(), True),\StructField("col1", IntegerType(), True),\StructField("id", StringType(), True),\StructField("col2", IntegerType(), True),\StructField("col3", IntegerType(), True),\StructField("coln", IntegerType(), True)])df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data, df1_schema)df1 = df1.withColumn(日期",to_date(日期", 'yyyy-MM-dd'))df1.show()+------------+----+---+----+----+----+|日期|col1|id|col2|col3|coln|+------------+----+---+----+----+----+|2020-08-01|0|M1|3|3|2||2020-08-02|0|M1|2|3|1||2020-08-03|0|M1|3|3|3||2020-08-04|0|M1|3|3|1||2020-08-01|0|M2|1|3|1||2020-08-02|0|M2|-1|3|2|+------------+----+---+----+----+----+
所以让我们专注于 2020-08-01
这是开始,我们想要的是 col1+col2,即 3 = col3.在依赖于 col3..col4...col5.. 的第 n 次计算之后,假设我们得到了一些数字 coln=3.在计算完成后,我们希望在 2020-08-02
, coln=3 应该在 col1所以在2020-08-01计算完成后是动态变化的
所以我想要的 df 看起来像这样
+----------+----+---+----+----+----+|日期|col1|id|col2|col3|coln|+------------+----+---+----+----+----+|2020-08-01|0|M1|3|3|2||2020-08-02|2|M1|2|5|1||2020-08-03|1|M1|3|4|3||2020-08-04|3|M1|3|6|1||2020-08-01|1|M2|1|4|1||2020-08-02|1|M2|-1|0|2|+------------+----+---+----+----+----+
编辑 2:
df1_schema = StructType([StructField(Date", StringType(), True),\StructField("col1", IntegerType(), True),\StructField("id", StringType(), True),\StructField("col2", IntegerType(), True),\StructField("col3", IntegerType(), True),\StructField("col4", IntegerType(), True),\StructField("coln", IntegerType(), True)])df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data, df1_schema)df1 = df1.withColumn(日期",to_date(日期", 'yyyy-MM-dd'))df1.show()+------------+----+---+----+----+----+----+|日期|col1|id|col2|col3|col4|coln|+------------+----+---+----+----+----+----+|2020-08-01|0|M1|3|3|2|2||2020-08-02|0|M1|2|3|0|1||2020-08-03|0|M1|3|3|2|3||2020-08-04|0|M1|3|3|2|1||2020-08-01|0|M2|1|3|3|1||2020-08-02|0|M2|-1|3|1|2|+------------+----+---+----+----+----+----+
所以让我们说 coln = col4 - col2 然后
+----------+----+---+----+----+----+----+|日期|col1|id|col2|col3|col4|coln|+------------+----+---+----+----+----+----+|2020-08-01|0|M1|3|3|2|-1||2020-08-02|-1|M1|2|1|0|-2||2020-08-03|-2|M1|3|1|2|-1||2020-08-04|-1|M1|3|2|2|-1||2020-08-01|0|M2|1|1|3|2||2020-08-02|2|M2|-1|1|1|2|+------------+----+---+----+----+----+----+
这是您可以使用 SparkSQL 内置函数处理的一类问题 aggregate(需要Spark 2.4+),下面概述了基本思想:
from pyspark.sql.functions import sort_array, collect_list, struct, to_datecols = ['日期','col1','col2','col3','coln']df_new = df1.groupby('id') \.agg(sort_array(collect_list(struct(*cols))).alias('dta')) \.selectExpr("id", """;排队(总计的(/* expr: 从第 2 项到最后一项遍历数组 `dta`*/切片(dta,2,大小(dta)-1),/* 开始:又名.结构体数组的零值* 带有单个元素 dta[0]*/数组(数据[0]),/* 合并:进行计算 */(acc, x) ->concat(acc,数组(named_struct('日期', x.Date,'col1', element_at(acc, -1).coln,'col2', x.col2,'col3', element_at(acc, -1).col3 + x.col2,'coln', x.col3 - x.col2)))))""")
输出:
df_new.show()+---+-----------+----+----+----+----+|身份证|日期|col1|col2|col3|coln|+---+-----------+----+----+----+----+|M1|2020-08-01|0|3|3|2||M1|2020-08-02|2|2|5|1||M1|2020-08-03|1|3|8|0||M1|2020-08-04|0|3|11|0||M2|2020-08-01|0|1|3|1||M2|2020-08-02|1|-1|2|4|+---+-----------+----+----+----+----+
地点:
我们将相同
id
的行分组并按Date
排序,将结果数组命名为dta
>在聚合函数中,我们用结构体数组
array(dta[0])
初始化acc
,然后遍历数组dta
使用 slice 从第二项到最后一项 功能在聚合函数的
merge
部分,可以使用x.col1
、x.coln
等来引用值在同一日期并使用element_at(acc, -1).col1
,element_at(acc, -1).coln
等来引用前一个日期的值.在合并函数中,我们使用
concat(acc, array(...))
将一个新元素追加到结构体数组acc
使用 inline 函数来分解上面的结构数组
acc
这个假设日期是连续的,如果缺少日期存在,您可以添加一些IF条件.例如计算下面的
col3
:IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2
顺便说一句.我没有使用示例 coln = col4 - col2
,而是使用 con3 = col3_prev + col2
,我认为这是一个更好的示例.
So I have a dataframe and I want to calculation some quantity let's say in daily basis..let's say we have 10 columns col1,col2,col3,col4... coln which each columns are dependent on value col1
, col2, col3 , col4.. and so on and the date resets based on the id
..
+--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 4| M1 | 10|
2020-08-03| 3| M1 | . . . 9 |
2020-08-04| 2| M1 | . . . 8 |
2020-08-05| 1| M1 | . . . 7 |
2020-08-06| 0| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 0 |
2020-08-02| 0| M2 | . . . . 1 |
2020-08-03| 0| M2 | . . . . 2 |
+---------+----+----+-----------------+
Let's say we execute this dataframe, there could be alot more columns in this df...
So to make this clear, let's say today's date is 2020-08-01. and we do some calculation and we got some output at coln let's say coln =3
at 2020-08-01, and I want to coln == col1 at 2020-08-02 which is col1 ==3 and carry on the calculation at 2020-08-02 and so on... so example of df looks like this below
+--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 3| M1 | 10|
2020-08-03|10| M1 | . . . 9 |
2020-08-04| 9| M1 | . . . 8 |
2020-08-05| 8| M1 | . . . 7 |
2020-08-06| 7| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 1 |
2020-08-02| 1| M2 | . . . . 2 |
2020-08-03| 2| M2 | . . . . 0 |
+---------+----+----+-----------------+
It would be great if you guys can give me an example how this can be done in pyspark..
example: let's say col3 = col1+ col2
and initally, let's say col1 is all 0.
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 0| M1| 2| 3| 1|
|2020-08-03| 0| M1| 3| 3| 3|
|2020-08-04| 0| M1| 3| 3| 1|
|2020-08-01| 0| M2| 1| 3| 1|
|2020-08-02| 0| M2| -1| 3| 2|
+----------+----+---+----+----+----+
So Let's focus on 2020-08-01
which is the beginning, and what we want is col1+col2 which is 3 = col3. and after nth calculation that is dependent on col3.. col4... col5.. let's say we got to some number coln= 3. after that calculation is done, we want at 2020-08-02
, that coln=3 should be at col1
so it's a dynamically changing after 2020-08-01 calculation is complete
so my desired df would look like this
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 2| M1| 2| 5| 1|
|2020-08-03| 1| M1| 3| 4| 3|
|2020-08-04| 3| M1| 3| 6| 1|
|2020-08-01| 1| M2| 1| 4| 1|
|2020-08-02| 1| M2| -1| 0| 2|
+----------+----+---+----+----+----+
EDIT 2:
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("col4", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| 2|
|2020-08-02| 0| M1| 2| 3| 0| 1|
|2020-08-03| 0| M1| 3| 3| 2| 3|
|2020-08-04| 0| M1| 3| 3| 2| 1|
|2020-08-01| 0| M2| 1| 3| 3| 1|
|2020-08-02| 0| M2| -1| 3| 1| 2|
+----------+----+---+----+----+----+----+
so let's say coln = col4 - col2 then
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| -1|
|2020-08-02| -1| M1| 2| 1| 0| -2|
|2020-08-03| -2| M1| 3| 1| 2| -1|
|2020-08-04| -1| M1| 3| 2| 2| -1|
|2020-08-01| 0| M2| 1| 1| 3| 2|
|2020-08-02| 2| M2| -1| 1| 1| 2|
+----------+----+---+----+----+----+----+
This is one type of questions you can handle with SparkSQL builtin function aggregate (require Spark 2.4+), below outlines the basic idea:
from pyspark.sql.functions import sort_array, collect_list, struct, to_date
cols = ['Date', 'col1', 'col2', 'col3', 'coln']
df_new = df1.groupby('id') \
.agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
.selectExpr("id", """
inline(
aggregate(
/* expr: iterate through the array `dta` from the 2nd to the last items*/
slice(dta,2,size(dta)-1),
/* start: AKA. the zero value which is an array of structs
* with a single element dta[0]
*/
array(dta[0]),
/* merge: do the calculations */
(acc, x) ->
concat(acc, array(named_struct(
'Date', x.Date,
'col1', element_at(acc, -1).coln,
'col2', x.col2,
'col3', element_at(acc, -1).col3 + x.col2,
'coln', x.col3 - x.col2
)))
)
)
""")
Output:
df_new.show()
+---+----------+----+----+----+----+
| id| Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01| 0| 3| 3| 2|
| M1|2020-08-02| 2| 2| 5| 1|
| M1|2020-08-03| 1| 3| 8| 0|
| M1|2020-08-04| 0| 3| 11| 0|
| M2|2020-08-01| 0| 1| 3| 1|
| M2|2020-08-02| 1| -1| 2| 4|
+---+----------+----+----+----+----+
Where:
we groupby rows for the same
id
and sort them byDate
, name the resulting array of structs asdta
in the aggregate function, we initialize
acc
with an array of structsarray(dta[0])
and then iterate through the arraydta
from the 2nd item to the last item using slice functionin the
merge
part of the aggregate function, you can usex.col1
,x.coln
etc to refer to values on the same Date and useelement_at(acc, -1).col1
,element_at(acc, -1).coln
etc to refer the values from the previous Date.in the merge function, we use
concat(acc, array(...))
to append a new element to the array of structsacc
use inline function to explode the above array of structs
acc
this assumed Dates are continuous, if missing date exists, you can add some IF conditions. for example to calculate
col3
below:IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2
BTW. I did not use the example coln = col4 - col2
, using con3 = col3_prev + col2
instead, I think, is a better example.
这篇关于如何在pyspark数据帧(时间序列)中计算每日基础的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!