如何在PySpark数据帧中计算每日基准(时间序列) [英] How to calculate daily basis in pyspark dataframe (time series)
问题描述
所以我有一个数据框,我想每天计算一些数量.让我们说我们有10列col1,col2,col3,col4 ... coln,每列都依赖于值 col1
,col2,col3,col4 ..等,然后根据 id
..
+ -------- + ---- + ---- + ---- +日期| col1 | id | col2 |..| coln+ -------- + ---- + ---- + ---- +2020-08-01 |0 |M1 |...3 |2020-08-02 |4 |M1 |10 |2020-08-03 |3 |M1 |...9 |2020-08-04 |2 |M1 |...8 |2020-08-05 |1 |M1 |...7 |2020-08-06 |0 |M1 |...0 |2020-08-01 |0 |M2 |...0 |2020-08-02 |0 |M2 |....1 |2020-08-03 |0 |M2 |....2 |+ --------- + ---- + ---- + ----------------- +
假设我们执行此数据帧,此df中可能会有更多列...为了明确起见,假设今天的日期是2020-08-01.我们进行了一些计算,然后在coln得到了一些输出,比如说2020-08-01的 coln = 3
,我想在2020-08-02的coln == col1,即col1 ==3并在2020-08-02进行计算,依此类推...因此df的示例如下所示
+ -------- + ---- + ---- + ---- +日期| col1 | id | col2 |..| coln+ -------- + ---- + ---- + ---- +2020-08-01 |0 |M1 |...3 |2020-08-02 |3 |M1 |10 |2020-08-03 | 10 |M1 |...9 |2020-08-04 |9 |M1 |...8 |2020-08-05 |8 |M1 |...7 |2020-08-06 |7 |M1 |...0 |2020-08-01 |0 |M2 |...1 |2020-08-02 |1 |M2 |....2 |2020-08-03 |2 |M2 |....0 |+ --------- + ---- + ---- + ----------------- +
如果你们能给我一个如何在pyspark中完成此操作的示例,那就太好了.
示例:假设 col3 = col1 + col2
,最初,假设col1均为0.
df1_schema = StructType([StructField("Date"",StringType(),True),\StructField("col1",IntegerType(),True),\StructField("id",StringType(),True),\StructField("col2",IntegerType(),True),\StructField("col3",IntegerType(),True),\StructField("coln",IntegerType(),True)])df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data,df1_schema)df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df1.show()+ ---------- + ---- + --- + ---- + ---- + ---- +|日期| col1 |id | col2 | col3 | coln |+ ---------- + ---- + --- + ---- + ---- + ---- +| 2020-08-01 |0 |M1 |3 |3 |2 || 2020-08-02 |0 |M1 |2 |3 |1 || 2020-08-03 |0 |M1 |3 |3 |3 || 2020-08-04 |0 |M1 |3 |3 |1 || 2020-08-01 |0 |M2 |1 |3 |1 || 2020-08-02 |0 |M2 |-1 |3 |2 |+ ---------- + ---- + --- + ---- + ---- + ---- +
因此,让我们集中在开始的 2020-08-01
上,我们想要的是col1 + col2,即3 = col3.并在依赖于col3的第n次计算之后.. col4 ... col5 ..假设我们得到了一些数字coln =3.完成该计算之后,我们想在 2020-08-02
,则coln = 3应该在col1处因此,在2020-08-01计算完成后,它是动态变化的
所以我想要的df看起来像这样
+ ---------- + ---- + --- + ---- + ---- + ---- +|日期| col1 |id | col2 | col3 | coln |+ ---------- + ---- + --- + ---- + ---- + ---- +| 2020-08-01 |0 |M1 |3 |3 |2 || 2020-08-02 |2 |M1 |2 |5 |1 || 2020-08-03 |1 |M1 |3 |4 |3 || 2020-08-04 |3 |M1 |3 |6 |1 || 2020-08-01 |1 |M2 |1 |4 |1 || 2020-08-02 |1 |M2 |-1 |0 |2 |+ ---------- + ---- + --- + ---- + ---- + ---- +
df1_schema = StructType([StructField("Date"",StringType(),True),\StructField("col1",IntegerType(),True),\StructField("id",StringType(),True),\StructField("col2",IntegerType(),True),\StructField("col3",IntegerType(),True),\StructField("col4",IntegerType(),True),\StructField("coln",IntegerType(),True)])df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data,df1_schema)df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df1.show()+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +|日期| col1 |id | col2 | col3 | col4 | coln |+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +| 2020-08-01 |0 |M1 |3 |3 |2 |2 || 2020-08-02 |0 |M1 |2 |3 |0 |1 || 2020-08-03 |0 |M1 |3 |3 |2 |3 || 2020-08-04 |0 |M1 |3 |3 |2 |1 || 2020-08-01 |0 |M2 |1 |3 |3 |1 || 2020-08-02 |0 |M2 |-1 |3 |1 |2 |+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +
所以我们说coln = col4-然后是col2
+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +|日期| col1 |id | col2 | col3 | col4 | coln |+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +| 2020-08-01 |0 |M1 |3 |3 |2 |-1 || 2020-08-02 |-1 |M1 |2 |1 |0 |-2 || 2020-08-03 |-2 |M1 |3 |1 |2 |-1 || 2020-08-04 |-1 |M1 |3 |2 |2 |-1 || 2020-08-01 |0 |M2 |1 |1 |3 |2 || 2020-08-02 |2 |M2 |-1 |1 |1 |2 |+ ---------- + ---- + --- + ---- + ---- + ---- + ---- +
这是您可以使用SparkSQL内置函数 输出: 位置: 我们将具有相同 在聚合函数中,我们用结构体数组 ,您可以使用 在合并功能中,我们使用 使用内联函数来炸开上面的结构体数组 此假定的日期是连续的,如果存在缺少的日期,则可以添加一些IF条件.例如,要在下面计算 顺便说一句.我认为我没有使用示例 So I have a dataframe and I want to calculation some quantity let's say in daily basis..let's say we have 10 columns col1,col2,col3,col4... coln which each columns are dependent on value Let's say we execute this dataframe, there could be alot more columns in this df...
So to make this clear, let's say today's date is 2020-08-01. and we do some calculation and we got some output at coln let's say It would be great if you guys can give me an example how this can be done in pyspark.. example: let's say So Let's focus on so my desired df would look like this EDIT 2: so let's say coln = col4 - col2 then
This is one type of questions you can handle with SparkSQL builtin function aggregate (require Spark 2.4+), below outlines the basic idea: Output: Where: we groupby rows for the same in the aggregate function, we initialize in the in the merge function, we use use inline function to explode the above array of structs this assumed Dates are continuous, if missing date exists, you can add some IF conditions. for example to calculate
BTW. I did not use the example 这篇关于如何在PySpark数据帧中计算每日基准(时间序列)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
df_new.show()+ --- + ---------- + ---- + ---- + ---- + ---- +|id |日期| col1 | col2 | col3 | coln |+ --- + ---------- + ---- + ---- + ---- + ---- +|M1 | 2020-08-01 |0 |3 |3 |2 ||M1 | 2020-08-02 |2 |2 |5 |1 ||M1 | 2020-08-03 |1 |3 |8 |0 ||M1 | 2020-08-04 |0 |3 |11 |0 ||M2 | 2020-08-01 |0 |1 |3 |1 ||M2 | 2020-08-02 |1 |-1 |2 |4 |+ --- + ---------- + ---- + ---- + ---- + ---- +
id
的行分组,并按 Date
进行排序,将结果结构数组命名为 dta
> array(dta [0])
初始化 acc
,然后遍历数组 dta
从第二项到最后一项,使用 slice功能 merge
部分中的 x.col1
, x.coln
等来引用值,并使用 element_at(acc,-1).col1
, element_at(acc,-1).coln
等引用先前日期的值. concat(acc,array(...))
将新元素附加到结构体数组 acc
acc
col3
:
IF(datediff(x.Date,element_at(acc,-1).Date)= 1,element_at(acc,-1).coln,0)+ x.col2
coln = col4-col2
,而是使用了 con3 = col3_prev + col2
,这是一个更好的示例.col1
, col2, col3 , col4.. and so on and the date resets based on the id
.. +--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 4| M1 | 10|
2020-08-03| 3| M1 | . . . 9 |
2020-08-04| 2| M1 | . . . 8 |
2020-08-05| 1| M1 | . . . 7 |
2020-08-06| 0| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 0 |
2020-08-02| 0| M2 | . . . . 1 |
2020-08-03| 0| M2 | . . . . 2 |
+---------+----+----+-----------------+
coln =3
at 2020-08-01, and I want to coln == col1 at 2020-08-02 which is col1 ==3 and carry on the calculation at 2020-08-02 and so on... so example of df looks like this below +--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 3| M1 | 10|
2020-08-03|10| M1 | . . . 9 |
2020-08-04| 9| M1 | . . . 8 |
2020-08-05| 8| M1 | . . . 7 |
2020-08-06| 7| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 1 |
2020-08-02| 1| M2 | . . . . 2 |
2020-08-03| 2| M2 | . . . . 0 |
+---------+----+----+-----------------+
col3 = col1+ col2
and initally, let's say col1 is all 0.df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 0| M1| 2| 3| 1|
|2020-08-03| 0| M1| 3| 3| 3|
|2020-08-04| 0| M1| 3| 3| 1|
|2020-08-01| 0| M2| 1| 3| 1|
|2020-08-02| 0| M2| -1| 3| 2|
+----------+----+---+----+----+----+
2020-08-01
which is the beginning, and what we want is col1+col2 which is 3 = col3. and after nth calculation that is dependent on col3.. col4... col5.. let's say we got to some number coln= 3. after that calculation is done, we want at 2020-08-02
, that coln=3 should be at col1
so it's a dynamically changing after 2020-08-01 calculation is complete+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 2| M1| 2| 5| 1|
|2020-08-03| 1| M1| 3| 4| 3|
|2020-08-04| 3| M1| 3| 6| 1|
|2020-08-01| 1| M2| 1| 4| 1|
|2020-08-02| 1| M2| -1| 0| 2|
+----------+----+---+----+----+----+
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("col4", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| 2|
|2020-08-02| 0| M1| 2| 3| 0| 1|
|2020-08-03| 0| M1| 3| 3| 2| 3|
|2020-08-04| 0| M1| 3| 3| 2| 1|
|2020-08-01| 0| M2| 1| 3| 3| 1|
|2020-08-02| 0| M2| -1| 3| 1| 2|
+----------+----+---+----+----+----+----+
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| -1|
|2020-08-02| -1| M1| 2| 1| 0| -2|
|2020-08-03| -2| M1| 3| 1| 2| -1|
|2020-08-04| -1| M1| 3| 2| 2| -1|
|2020-08-01| 0| M2| 1| 1| 3| 2|
|2020-08-02| 2| M2| -1| 1| 1| 2|
+----------+----+---+----+----+----+----+
from pyspark.sql.functions import sort_array, collect_list, struct, to_date
cols = ['Date', 'col1', 'col2', 'col3', 'coln']
df_new = df1.groupby('id') \
.agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
.selectExpr("id", """
inline(
aggregate(
/* expr: iterate through the array `dta` from the 2nd to the last items*/
slice(dta,2,size(dta)-1),
/* start: AKA. the zero value which is an array of structs
* with a single element dta[0]
*/
array(dta[0]),
/* merge: do the calculations */
(acc, x) ->
concat(acc, array(named_struct(
'Date', x.Date,
'col1', element_at(acc, -1).coln,
'col2', x.col2,
'col3', element_at(acc, -1).col3 + x.col2,
'coln', x.col3 - x.col2
)))
)
)
""")
df_new.show()
+---+----------+----+----+----+----+
| id| Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01| 0| 3| 3| 2|
| M1|2020-08-02| 2| 2| 5| 1|
| M1|2020-08-03| 1| 3| 8| 0|
| M1|2020-08-04| 0| 3| 11| 0|
| M2|2020-08-01| 0| 1| 3| 1|
| M2|2020-08-02| 1| -1| 2| 4|
+---+----------+----+----+----+----+
id
and sort them by Date
, name the resulting array of structs as dta
acc
with an array of structs array(dta[0])
and then iterate through the array dta
from the 2nd item to the last item using slice functionmerge
part of the aggregate function, you can use x.col1
, x.coln
etc to refer to values on the same Date and use element_at(acc, -1).col1
, element_at(acc, -1).coln
etc to refer the values from the previous Date.concat(acc, array(...))
to append a new element to the array of structs acc
acc
col3
below:IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2
coln = col4 - col2
, using con3 = col3_prev + col2
instead, I think, is a better example.