如何创建行并在pyspark中的给定df中增加行 [英] How to create rows and increment it in given df in pyspark

查看:260
本文介绍了如何创建行并在pyspark中的给定df中增加行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要的是根据我拥有的给定数据帧创建一个新行,它看起来如下:

What I want is create a new row based on the given dataframe I have and It looks like the following:

TEST_schema = StructType([StructField("date", StringType(), True),\
                          StructField("col1", IntegerType(), True),
                          StructField("col2", IntegerType(), True)\
                          ])
TEST_data = [('2020-08-17',0,0),('2020-08-18',2,1),('2020-08-19',0,2),('2020-08-20',3,0),('2020-08-21',4,2),\
             ('2020-08-22',1,3),('2020-08-23',2,2),('2020-08-24',1,2),('2020-08-25',3,1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show() 

+----------+----+----+
|      date|col1|col2|
+----------+----+----+
|2020-08-17|   0|   0|
|2020-08-18|   2|   1|
|2020-08-19|   0|   2|
|2020-08-20|   3|   0|
|2020-08-21|   4|   2|
|2020-08-22|   1|   3|
|2020-08-23|   2|   2|
|2020-08-24|   1|   2|
|2020-08-25|   3|   1|
+----------+----+----+

假设我要计算今天的日期为current_date(),并假设我要计算col1如下:If col1 >0 return col1+col2, otherwise 0其中date == yesturday的日期将为current_date() -1

Let's say I want to calculate for today's date which is current_date() and let's say i want to calculate col1 as follows: If col1 >0 return col1+col2, otherwise 0 where date == yesturday 's date which is going to be current_date() -1

计算col2如下,coalesce( lag(col2),0)

所以我的结果数据框将是这样的:

so my result dataframe would be something like this:

+----------+----+----+
|      date|col1|want|
+----------+----+----+
|2020-08-17|   0|   0|
|2020-08-18|   2|   0|
|2020-08-19|   0|   1|
|2020-08-20|   3|   2|
|2020-08-21|   4|   0|
|2020-08-22|   1|   2|
|2020-08-23|   2|   3|
|2020-08-24|   1|   2|
|2020-08-25|   3|   2|
|2020-08-26|   4|   1|
+----------+----+----+

如果我们使用withcolumn(基于列)方法,这将非常容易,但是我想知道如何对行执行此操作.我最初的想法是先按列计算,然后按transpose并使其基于行.

This would be so easy if we use withcolumn (column based) method but I want to know how to do this with rows. My initial idea is calculate by column first and transpose it and make it rowbased.

推荐答案

IIUC,您可以尝试以下操作:

IIUC, you can try the following:

第1步::创建一个新数据框,其中一行包含current_date()作为日期,col1和col2为空,然后将其合并回TEST_df(注意:在最终代码中将所有 2020-08-26 更改为current_date()):

Step-1: create a new dataframe with a single row having current_date() as date, nulls for col1 and col2 and then union it back to the TEST_df (Note: change all 2020-08-26 to current_date() in your final code):

df_new = TEST_df.union(spark.sql("select '2020-08-26', null, null")) 

编辑:实际上,数据已分区,每个分区应添加一行,您可以执行以下操作:

Practically, data are partitioned and each partition should add one row, you can do something like the following:

from pyspark.sql.functions import current_date, col, lit

#columns used for Window partitionBy
cols_part = ['pcol1', 'pcol2']

df_today = TEST_df.select([
    (current_date() if c == 'date' else col(c) if c in cols_part else lit(None)).alias(c)
        for c in TEST_df.columns
]).distinct()

df_new = TEST_df.union(df_today)

步骤2:进行计算以填充上述空值:

Step-2: do calculations to fill the above null values:

df_new.selectExpr(
  "date", 
  "IF(date < '2020-08-26', col1, lag(IF(col1>0, col1+col2,0)) over(order by date)) as col1",
  "lag(col2,1,0) over(order by date) as col2"
).show()
+----------+----+----+
|      date|col1|col2|
+----------+----+----+
|2020-08-17|   0|   0|
|2020-08-18|   2|   0|
|2020-08-19|   0|   1|
|2020-08-20|   3|   2|
|2020-08-21|   4|   0|
|2020-08-22|   1|   2|
|2020-08-23|   2|   3|
|2020-08-24|   1|   2|
|2020-08-25|   3|   2|
|2020-08-26|   4|   1|
+----------+----+----+

这篇关于如何创建行并在pyspark中的给定df中增加行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆