Pyspark:如何编写复杂的数据框计算潜在客户总和 [英] Pyspark : how to code complicated dataframe calculation lead sum
问题描述
我给了看起来像这样的数据框. 此数据帧按日期排序,而col1只是一些随机值.
I have given dataframe that looks like this. THIS dataframe is sorted by date, and col1 is just some random value.
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("col1", IntegerType(), True),\
])
TEST_data = [('2020-08-01',3),('2020-08-02',1),('2020-08-03',-1),('2020-08-04',-1),('2020-08-05',3),\
('2020-08-06',-1),('2020-08-07',6),('2020-08-08',4),('2020-08-09',5)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df.show()
+----------+----+
| date|col1|
+----------+----+
|2020-08-01| 3|
|2020-08-02| 1|
|2020-08-03| -1|
|2020-08-04| -1|
|2020-08-05| 3|
|2020-08-06| -1|
|2020-08-07| 6|
|2020-08-08| 4|
|2020-08-09| 5|
+----------+----+
LOGIC:lead(col1)+1,如果col1 ==-1,则从前一个值lead(col1)+2 ...
结果数据框将如下所示(想要的列是我想要的输出)
LOGIC : lead(col1) +1, if col1 ==-1, then from the previous value lead(col1) +2...
the resulted dataframe will look like this (want column is what i want as output)
+----------+----+----+
| date|col1|WANT|
+----------+----+----+
|2020-08-01| 3| 2|
|2020-08-02| 1| 6|
|2020-08-03| -1| 5|
|2020-08-04| -1| 4|
|2020-08-05| 3| 8|
|2020-08-06| -1| 7|
|2020-08-07| 6| 5|
|2020-08-08| 4| 6|
|2020-08-09| 5| -1|
+----------+----+----+
让我们看一下最后一行,其中col1 == 5,其中5领先于wan == 6的+1(2020-08-08) 如果我们有col ==-1,那么我们再加+1,如果我们有col ==-1重复两次,那么我们再加+2. 这很难用语言解释,最后因为它创建了最后一列而不是null,所以用-1代替.我有一个图
Let's look at last row, where col1==5, that 5 is leaded +1 which is in want==6 (2020-08-08) If we have col==-1, then we add +1 more ,, if we have col==-1 repeated twice, then we add +2 more.. this is hard to explain in words,lastly since it created last column instead of null, replace with -1. I have a diagram
推荐答案
您可以检查以下代码和逻辑是否对您有用:
You can check if the following code and logic works for you:
- 创建一个子组标签
g
,其总和为int(col1!=-1)
,我们仅关注col1 == -1的行,并取消所有其他行. - 残差为1,如果col1 == -1,加上Window
w2
上的运行计数 - 通过不等于-1的
w1
接受prev_col1(使用
- create a sub-group label
g
which take running sum ofint(col1!=-1)
, and we only concern about Rows with col1 == -1, and nullify all other Rows. - the residual is 1 and if col1 == -1, plus the running count on Window
w2
- take the prev_col1 over
w1
which is not -1 (using nullif), (the naming of prev_col1 might be confusion since it takes only if col1 = -1 using typical pyspark's way to do ffill, otherwise keep the original). - set val = prev_col1 + residual, take the lag and set null to -1
以下代码:
from pyspark.sql.functions import when, col, expr, count, desc, lag, coalesce
from pyspark.sql import Window
w1 = Window.orderBy(desc('date'))
w2 = Window.partitionBy('g').orderBy(desc('date'))
TEST_df.withColumn('g', when(col('col1') == -1, expr("sum(int(col1!=-1))").over(w1))) \
.withColumn('residual', when(col('col1') == -1, count('*').over(w2) + 1).otherwise(1)) \
.withColumn('prev_col1',expr("last(nullif(col1,-1),True)").over(w1)) \
.withColumn('want', coalesce(lag(expr("prev_col1 + residual")).over(w1),lit(-1))) \
.orderBy('date').show()
+----------+----+----+--------+---------+----+
| date|col1| g|residual|prev_col1|want|
+----------+----+----+--------+---------+----+
|2020-08-01| 3|null| 1| 3| 2|
|2020-08-02| 1|null| 1| 1| 6|
|2020-08-03| -1| 4| 3| 3| 5|
|2020-08-04| -1| 4| 2| 3| 4|
|2020-08-05| 3|null| 1| 3| 8|
|2020-08-06| -1| 3| 2| 6| 7|
|2020-08-07| 6|null| 1| 6| 5|
|2020-08-08| 4|null| 1| 4| 6|
|2020-08-09| 5|null| 1| 5| -1|
+----------+----+----+--------+---------+----+
这篇关于Pyspark:如何编写复杂的数据框计算潜在客户总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!