collect_list 通过保留基于另一个变量的顺序 [英] collect_list by preserving order based on another variable
问题描述
我试图在 Pyspark 中使用现有列集上的 groupby 聚合创建一个新的列表列.下面提供了一个示例输入数据框:
I am trying to create a new column of lists in Pyspark using a groupby aggregation on existing set of columns. An example input data frame is provided below:
------------------------
id | date | value
------------------------
1 |2014-01-03 | 10
1 |2014-01-04 | 5
1 |2014-01-05 | 15
1 |2014-01-06 | 20
2 |2014-02-10 | 100
2 |2014-03-11 | 500
2 |2014-04-15 | 1500
预期输出为:
id | value_list
------------------------
1 | [10, 5, 15, 20]
2 | [100, 500, 1500]
列表中的值按日期排序.
The values within a list are sorted by the date.
我尝试使用 collect_list 如下:
I tried using collect_list as follows:
from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
但是即使我在聚合前按日期对输入数据框进行排序,collect_list 也不能保证顺序.
But collect_list doesn't guarantee order even if I sort the input data frame by date before aggregation.
有人可以通过保留基于第二个(日期)变量的顺序来帮助如何进行聚合?
Could someone help on how to do aggregation by preserving the order based on a second (date) variable?
推荐答案
如果您将日期和值都收集为列表,则可以使用 udf
根据日期对结果列进行排序,然后只保留结果中的值.
If you collect both dates and values as a list, you can sort the resulting column according to date using and udf
, and then keep only the values in the result.
import operator
import pyspark.sql.functions as F
# create list column
grouped_df = input_df.groupby("id") \
.agg(F.collect_list(F.struct("date", "value")) \
.alias("list_col"))
# define udf
def sorter(l):
res = sorted(l, key=operator.itemgetter(0))
return [item[1] for item in res]
sort_udf = F.udf(sorter)
# test
grouped_df.select("id", sort_udf("list_col") \
.alias("sorted_list")) \
.show(truncate = False)
+---+----------------+
|id |sorted_list |
+---+----------------+
|1 |[10, 5, 15, 20] |
|2 |[100, 500, 1500]|
+---+----------------+
这篇关于collect_list 通过保留基于另一个变量的顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!