如何创建行并在pyspark中的给定df中增加行 [英] How to create rows and increment it in given df in pyspark
问题描述
我想要的是根据我拥有的给定数据帧创建一个新行,它看起来如下:
What I want is create a new row based on the given dataframe I have and It looks like the following:
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("col1", IntegerType(), True),
StructField("col2", IntegerType(), True)\
])
TEST_data = [('2020-08-17',0,0),('2020-08-18',2,1),('2020-08-19',0,2),('2020-08-20',3,0),('2020-08-21',4,2),\
('2020-08-22',1,3),('2020-08-23',2,2),('2020-08-24',1,2),('2020-08-25',3,1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show()
+----------+----+----+
| date|col1|col2|
+----------+----+----+
|2020-08-17| 0| 0|
|2020-08-18| 2| 1|
|2020-08-19| 0| 2|
|2020-08-20| 3| 0|
|2020-08-21| 4| 2|
|2020-08-22| 1| 3|
|2020-08-23| 2| 2|
|2020-08-24| 1| 2|
|2020-08-25| 3| 1|
+----------+----+----+
假设我要计算今天的日期为current_date()
,并假设我要计算col1
如下:If col1 >0 return col1+col2, otherwise 0
其中date == yesturday的日期将为current_date() -1
Let's say I want to calculate for today's date which is current_date()
and let's say i want to calculate col1
as follows: If col1 >0 return col1+col2, otherwise 0
where date == yesturday 's date which is going to be current_date() -1
计算col2
如下,coalesce( lag(col2),0)
所以我的结果数据框将是这样的:
so my result dataframe would be something like this:
+----------+----+----+
| date|col1|want|
+----------+----+----+
|2020-08-17| 0| 0|
|2020-08-18| 2| 0|
|2020-08-19| 0| 1|
|2020-08-20| 3| 2|
|2020-08-21| 4| 0|
|2020-08-22| 1| 2|
|2020-08-23| 2| 3|
|2020-08-24| 1| 2|
|2020-08-25| 3| 2|
|2020-08-26| 4| 1|
+----------+----+----+
如果我们使用withcolumn
(基于列)方法,这将非常容易,但是我想知道如何对行执行此操作.我最初的想法是先按列计算,然后按transpose
并使其基于行.
This would be so easy if we use withcolumn
(column based) method but I want to know how to do this with rows. My initial idea is calculate by column first and transpose
it and make it rowbased.
推荐答案
IIUC,您可以尝试以下操作:
IIUC, you can try the following:
第1步::创建一个新数据框,其中一行包含current_date()作为日期,col1和col2为空,然后将其合并回TEST_df(注意:在最终代码中将所有 2020-08-26 更改为current_date()
):
Step-1: create a new dataframe with a single row having current_date() as date, nulls for col1 and col2 and then union it back to the TEST_df (Note: change all 2020-08-26 to current_date()
in your final code):
df_new = TEST_df.union(spark.sql("select '2020-08-26', null, null"))
编辑:实际上,数据已分区,每个分区应添加一行,您可以执行以下操作:
Practically, data are partitioned and each partition should add one row, you can do something like the following:
from pyspark.sql.functions import current_date, col, lit
#columns used for Window partitionBy
cols_part = ['pcol1', 'pcol2']
df_today = TEST_df.select([
(current_date() if c == 'date' else col(c) if c in cols_part else lit(None)).alias(c)
for c in TEST_df.columns
]).distinct()
df_new = TEST_df.union(df_today)
步骤2:进行计算以填充上述空值:
Step-2: do calculations to fill the above null values:
df_new.selectExpr(
"date",
"IF(date < '2020-08-26', col1, lag(IF(col1>0, col1+col2,0)) over(order by date)) as col1",
"lag(col2,1,0) over(order by date) as col2"
).show()
+----------+----+----+
| date|col1|col2|
+----------+----+----+
|2020-08-17| 0| 0|
|2020-08-18| 2| 0|
|2020-08-19| 0| 1|
|2020-08-20| 3| 2|
|2020-08-21| 4| 0|
|2020-08-22| 1| 2|
|2020-08-23| 2| 3|
|2020-08-24| 1| 2|
|2020-08-25| 3| 2|
|2020-08-26| 4| 1|
+----------+----+----+
这篇关于如何创建行并在pyspark中的给定df中增加行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!