Pyspark:如何编写复杂的数据帧计算 [英] Pyspark :How to code complicated Dataframe calculation
问题描述
数据框已经按日期排序,
The dataframe is already sorted out by date,
col1 ==1 值是唯一的,
col1 ==1 value is unique,
并且 col1==1 被传递,它将增加 1(例如 1,2,3,4,5,6,7...)只有 -1 是重复的.
and col1==1 is passed, it will increase increment by 1 (eg. 1,2,3,4,5,6,7...) and only the -1 are duplicates.
我有一个看起来像这样的数据框,称之为 df
I have a dataframe looks like this call it df
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',-1,-1),('2020-08-02',-1,-1),('2020-08-03',-1,3),('2020-08-04',-1,2),('2020-08-05',1,4),\
('2020-08-06',2,1),('2020-08-07',3,2),('2020-08-08',4,3),('2020-08-09',5,-1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df.show()
+--------+----+----+
date |col1|col2|
+--------+----+----+
2020-08-01| -1| -1|
2020-08-02| -1| -1|
2020-08-03| -1| 3|
2020-08-04| -1| 2|
2020-08-05| 1 | 4|
2020-08-06| 2 | 1|
2020-08-07| 3 | 2|
2020-08-08| 4 | 3|
2020-08-09| 5 | -1|
+--------+----+----+
条件是当 col1 == 1 时,然后我们从 col2 ==4 开始向后添加,(例如 4,5,6,7,8,...)并且 col2 == 4 之后返回 0 all方式(例如 4,0,0,0,0...)
The condition is when col1 == 1, then we start adding backwards from col2 ==4, (eg. 4,5,6,7,8,...) and the after col2 == 4 return 0 all the way (eg. 4,0,0,0,0...)
所以,我的结果 df 看起来像这样.
So, my resulted df will look something like this.
+--------+----+----+----+
date |col1|col2|want
+--------+----+----+----+
2020-08-01| -1| -1| 8 |
2020-08-02| -1| -1| 7 |
2020-08-03| -1| 3| 6 |
2020-08-04| -1| 2| 5 |
2020-08-05| 1 | 4| 4 |
2020-08-06| 2 | 1| 0 |
2020-08-07| 3 | 2| 0 |
2020-08-08| 4 | 3| 0 |
2020-08-09| 5 | -1| 0 |
+---------+----+----+----+
增强:我想添加额外条件 where col2 == -1 whencol1 == 1 (at 2020-08-05), and col2 == -1 连续..然后我想连续计数-1,然后添加连续中断 col2 == ?价值.所以这是一个需要清除的例子.
Enhancement: I want to add additional condition where col2 == -1 when col1 == 1 (at 2020-08-05), and col2 == -1 goes consecutive.. then I want to count consecutive -1, and then add where the consecutive breaks col2 == ? value. so here's an example to clear.
+--------+----+----+----+
date |col1|col2|want
+--------+----+----+----+
2020-08-01| -1| -1| 11|
2020-08-02| -1| -1| 10|
2020-08-03| -1| 3| 9 |
2020-08-04| -1| 2| 8 |
2020-08-05| 1 | -1| 7*|
2020-08-06| 2 | -1| 0 |
2020-08-07| 3 | -1| 0 |
2020-08-08| 4 | 4*| 0 |
2020-08-09| 5 | -1| 0 |
+---------+----+----+----+
所以,我们看到3个连续的-1,(从2020-08-05开始,我们只关心第一个连续的-1),连续之后我们有4个(在2020-08-08用*表示),那么我们将在 col1 ==1 行有 4+ 3 =7 .有可能吗?
so, we see 3 consecutive -1s, (starting from 2020-08-05, we only care about first consecutive -1s) and after the consecutive we have 4 (at 2020-08-08 denoted as *), then we would have 4+ 3 =7 at the col1 ==1 row. is it possible?
** 我的第一次尝试 **
** MY 1ST ATTEMPT **
TEST_df = TEST_df.withColumn('cumsum', sum(when( col('col1') < 1, col('col1') ) \
.otherwise( when( col('col1') == 1, 1).otherwise(0))).over(Window.partitionBy('col1').orderBy().rowsBetween(-sys.maxsize, 0)))
TEST_df.show()
+----------+----+----+------+
| date|col1|col2|cumsum|
+----------+----+----+------+
|2020-08-01| -1| -1| -1|
|2020-08-02| -1| -1| -2|
|2020-08-03| -1| 3| -3|
|2020-08-04| -1| 2| -4|
|2020-08-05| 1| 4| 1|
|2020-08-07| 3| 2| 0|
|2020-08-09| 5| -1| 0|
|2020-08-08| 4| 3| 0|
|2020-08-06| 2| 1| 0|
+----------+----+----+------+
w1 = Window.orderBy(desc('date'))
w2 =Window.partitionBy('case').orderBy(desc('cumsum'))
TEST_df.withColumn('case', sum(when( (col('cumsum') == 1) & (col('col2') != -1) , col('col2')) \
.otherwise(0)).over(w1)) \
.withColumn('rank', when(col('case') != 0, rank().over(w2)-1).otherwise(0)) \
.withColumn('want', col('case') + col('rank')) \
.orderBy('date') \
+----------+----+----+------+----+----+----+
|date |col1|col2|cumsum|case|rank|want|
+----------+----+----+------+----+----+----+
|2020-08-01|-1 |-1 |-1 |4 |1 |5 |
|2020-08-02|-1 |-1 |-2 |4 |2 |6 |
|2020-08-03|-1 |3 |-3 |4 |3 |7 |
|2020-08-04|-1 |2 |-4 |4 |4 |8 |
|2020-08-05|1 |4 |1 |4 |0 |4 |
|2020-08-06|2 |1 |0 |0 |0 |0 |
|2020-08-07|3 |2 |0 |0 |0 |0 |
|2020-08-08|4 |3 |0 |0 |0 |0 |
|2020-08-09|5 |-1 |0 |0 |0 |0 |
+----------+----+----+------+----+----+----+
你看到排名 1,2,3,4 如果我能把它变成 4,3,2,1 它看起来就像我的结果数据框......如何反转它?我尝试了 orderby asc 和 desc ...当然,这是在增强
You see that rank 1,2,3,4 if I can make it 4,3,2,1 it will look like my resulted dataframe.... how to reverse it? i tried both orderby asc, and desc... and of course this is before the enhancement
推荐答案
IIUC,你可以试试以下:
IIUC, you can try the following:
groupby 并创建所有相关行的 collect_list(下面代码中的
vals
),按日期按降序对列表进行排序(注意:更改groupby(lit(1))
到您可以用来将数据划分为独立子集的任何列.
groupby and create a collect_list of all related rows(
vals
in below code), sort the list by date in desencending order (Note: changegroupby(lit(1))
to whatever columns you can use to divide your data into independent subset.
找到具有col1 == 1
如果 col2==-1
位于 idx
,则找到从 idx 到列表开头的偏移量,其中第一行具有 col2!= -1
(注意:在当前代码中,如果idx
之前的所有col2都是-1,则偏移量可能为NULL,您必须决定想要.例如使用 coalesce(IF(...),0)
)
if col2==-1
at idx
, then find the offset from idx to the beginning of the list with the first row having col2 != -1
(Note: in the current code, offset might be NULL if all col2 before idx
are -1, you will have to decide what you want. for example use coalesce(IF(...),0)
)
有了偏移量和idx之后,want
列可以通过以下方式计算:
after we have offset and idx, the want
column can be calculated by:
IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
使用 SparkSQL 函数 内联爆炸结构数组.
注意:如果生产数据框中存在太多列,可以使用 Window 函数应用相同的逻辑.
Note: The same logic can be applied using Window function in case too many columns exist in your production dataframe.
代码如下:
from pyspark.sql.functions import sort_array, collect_list, struct, expr, lit
TEST_df = spark.createDataFrame([
('2020-08-01', -1, -1), ('2020-08-02', -1, -1), ('2020-08-03', -1, 3),
('2020-08-04', -1, 2), ('2020-08-05', 1, -1), ('2020-08-06', 2, -1),
('2020-08-07', 3, -1), ('2020-08-08', 4, 4), ('2020-08-09', 5, -1)
], ['date', 'col1', 'col2'])
# list of column used in calculation
cols = ["date", "col1", "col2"]
df_new = TEST_df \
.groupby(lit(1)) \
.agg(sort_array(collect_list(struct(*cols)),False).alias('vals')) \
.withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
.withColumn('offset', expr("""
coalesce(IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0),0)
""")).selectExpr("""
inline(
transform(vals, (x,i) -> named_struct(
'dta', x,
'want', IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
)
)
)""").select('dta.*', 'want')
输出:
df_new.orderBy('date').show()
+----------+----+----+----+
| date|col1|col2|want|
+----------+----+----+----+
|2020-08-01| -1| -1| 11|
|2020-08-02| -1| -1| 10|
|2020-08-03| -1| 3| 9|
|2020-08-04| -1| 2| 8|
|2020-08-05| 1| -1| 7|
|2020-08-06| 2| -1| 0|
|2020-08-07| 3| -1| 0|
|2020-08-08| 4| 4| 0|
|2020-08-09| 5| -1| 0|
+----------+----+----+----+
根据评论,添加了使用 Window 聚合函数代替 groupby 的替代方法:
Per comments, added an alternative to use Window aggregate function instead of groupby:
from pyspark.sql import Window
# WindowSpec to cover all related Rows in the same partition
w1 = Window.partitionBy().orderBy('date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
cols = ["date", "col1", "col2"]
# below `cur_idx` is the index for the current Row in array `vals`
df_new = TEST_df.withColumn('vals', sort_array(collect_list(struct(*cols)).over(w1),False)) \
.withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
.withColumn('offset', expr("IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0)")) \
.withColumn("cur_idx", expr("array_position(vals, struct(date,col1,col2))-1")) \
.selectExpr(*TEST_df.columns, "IF(cur_idx<idx, 0, vals[idx-offset].col2 + offset + cur_idx - idx) as want")
这篇关于Pyspark:如何编写复杂的数据帧计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!