如何在 PySpark collect_list 中维护排序顺序并收集多个列表 [英] How to maintain sort order in PySpark collect_list and collect multiple lists

查看:42
本文介绍了如何在 PySpark collect_list 中维护排序顺序并收集多个列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想维护日期排序顺序,对多列使用 collect_list,所有列都具有相同的日期顺序.我将在同一个数据框中需要它们,以便我可以利用它们来创建时间序列模型输入.以下是train_data"的示例:

I want to maintain the date sort-order, using collect_list for multiple columns, all with the same date order. I'll need them in the same dataframe so I can utilize to create a time series model input. Below is a sample of the "train_data":

我正在使用带有 PartitionBy 的窗口,以通过每个 Syscode_Stn 的 tuning_evnt_start_dt 来确保排序顺序.我可以使用以下代码创建一列:

I'm using a Window with PartitionBy to ensure sort order by tuning_evnt_start_dt for each Syscode_Stn. I can create one column with this code:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
           )\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))

但是如何在同一个新数据框中创建两列?

but how do I create two columns in the same new dataframe?

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))

请注意,MarchMadInd 未显示在屏幕截图中,但包含在 train_data 中.关于我如何到达我所在位置的说明:https://stackoverflow.com/a/49255498/8691976

Note that MarchMadInd is not shown in the screenshot, but is included in train_data. Explanation of how I got to where I am: https://stackoverflow.com/a/49255498/8691976

推荐答案

是的,正确的方法是添加连续的 .withColumn 语句,然后是删除每个数组重复项的 .agg 语句.

Yes, the correct way is to add successive .withColumn statements, followed by a .agg statement that removes the duplicates for each array.

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data.withColumn('spp_imp_daily', 
F.collect_list('spp_imp_daily').over(w)
                                  )\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\

.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'), 
 F.max('MarchMadInd').alias('MarchMadInd')
)

这篇关于如何在 PySpark collect_list 中维护排序顺序并收集多个列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆