向Spark数据框中的每个组添加数据行 [英] Add rows of data to each group in a Spark dataframe

查看:10
本文介绍了向Spark数据框中的每个组添加数据行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这个数据帧-

data = [(0,1,1,201505,3),
        (1,1,1,201506,5),
        (2,1,1,201507,7),
        (3,1,1,201508,2),
        (4,2,2,201750,3),
        (5,2,2,201751,0),
        (6,2,2,201752,1),
        (7,2,2,201753,1)
       ]
cols = ['id','item','store','week','sales']
data_df = spark.createDataFrame(data=data,schema=cols)
display(data_df)

我想要的是什么-

data_new = [(0,1,1,201505,3,0),
            (1,1,1,201506,5,0),
            (2,1,1,201507,7,0),
            (3,1,1,201508,2,0),
            (4,1,1,201509,0,0),
            (5,1,1,201510,0,0),
            (6,1,1,201511,0,0),
            (7,1,1,201512,0,0),
            (8,2,2,201750,3,0),
            (9,2,2,201751,0,0),
            (10,2,2,201752,1,0),
            (11,2,2,201753,1,0),
            (12,2,2,201801,0,0),
            (13,2,2,201802,0,0),
            (14,2,2,201803,0,0),
            (15,2,2,201804,0,0)]
cols_new = ['id','item','store','week','sales','flag',]
data_df_new = spark.createDataFrame(data=data_new,schema=cols_new)
display(data_df_new)
因此,基本上,我需要为每个商品-商店分组组合提供8(也可以是6或10)周的数据。正如我在示例中提到的,无论一年的52/53周结束时,我都需要下一年的周数。我需要这个在PySpark,提前谢谢!

推荐答案

请参阅下面的我的尝试。本可以写得更短,但我觉得应该尽可能地明确,这样我就不会把灵魂链接在一起。下面的代码

from pyspark.sql import functions as F
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


# Convert week of the year to date
s=data_df.withColumn("week", expr("cast (week as string)")).withColumn('date', F.to_date(F.concat("week",F.lit("6")), "yyyywwu"))


s = (s.groupby('item', 'store').agg(F.collect_list('sales').alias('sales'),F.collect_list('date').alias('date'))#Put sales and dates in an array
     .withColumn("id", sequence(lit(0), lit(6)))#Create sequence ids with the required expansion range per group
    )

#Explode datframe back with each item/store combination in a row
s =s.selectExpr('item','store','inline(arrays_zip(date,id,sales))')

#Create partition window broadcasting from start to end for each item/store combination
w = Window.partitionBy('item','store').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize)

#Create partition window broadcasting from start to end for each item/store/date combination. the purpose here is to aggregate over null dates as group
w1 = Window.partitionBy('item','store','date').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.currentRow)

s=(s.withColumn('increment', F.when(col('date').isNull(),(row_number().over(w1))*7).otherwise(0))#Create increment values per item/store combination
   
   .withColumn('date1', F.when(col('date').isNull(),max('date').over(w)).otherwise(col('date')))#get last date in each item/store combination
   
  )



# #Compute the  week of year and drop columns not wanted
s = s.withColumn("weekofyear", expr("weekofyear(date_add(date1, cast(increment as int)))")).drop('date','increment','date1').na.fill(0)               
               


s.show(truncate=False)

结果

+----+-----+---+-----+----------+
|item|store|id |sales|weekofyear|
+----+-----+---+-----+----------+
|1   |1    |0  |3    |5         |
|1   |1    |1  |5    |6         |
|1   |1    |2  |7    |7         |
|1   |1    |3  |2    |8         |
|1   |1    |4  |0    |9         |
|1   |1    |5  |0    |10        |
|1   |1    |6  |0    |11        |
|2   |2    |0  |3    |50        |
|2   |2    |1  |0    |51        |
|2   |2    |2  |1    |52        |
|2   |2    |3  |1    |1         |
|2   |2    |4  |0    |2         |
|2   |2    |5  |0    |3         |
|2   |2    |6  |0    |4         |
+----+-----+---+-----+----------+

这篇关于向Spark数据框中的每个组添加数据行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆