Pyspark Spark DataFrame-聚合和过滤地图类型列中的列 [英] Pyspark Spark DataFrame - Aggregate and filter columns in map type column
问题描述
我的 DataFrame
看起来像:
| c1 | c2| c3 |
|----+---+-------
| A | b | 22:00|
| A | b | 23:00|
| A | b | 09:00|
| A | c | 22:00|
| B | c | 09:30|
我想执行一些聚合并创建第二个 DataFrame
有3列:
I would like to perform some aggregations and create a second DataFrame
with 3 columns:
c1 :是我要分组的列。
map_category_room_date :地图类型,键入 c2
并在 c3中设置较低/最小值
。
map_category_room_date: map type, key the c2
and value the lower/min value in c3
.
cnt_orig :是原始组有多少行的计数。
cnt_orig: is the count on how many rows the original group had.
结果
Result
| c1 | map_category_room_date | cnt_orig |
|----------+-------------------------+----------|
| 'A' |{'b': 09:00, 'C': 22:00} | 4 |
| 'B' |{'c': 09:30} | 1 |
我可以使用哪些汇总函数来归档,这是最简单的方法?
What aggregate functions can I use to archive this is the most simple way?
谢谢
推荐答案
您可以窗口
函数生成 count
,然后使用内置函数
通过执行以下操作来获取所需的最终数据框
You can window
function to generate the count
, then use inbuilt functions
to get the final dataframe you desire by doing to following
from pyspark.sql import Window
windowSpec = Window.partitionBy("c1")
from pyspark.sql import functions as F
df.withColumn("cnt_orig", count('c1').over(windowSpec)).orderBy('c3').groupBy("c1", "c2", "cnt_orig").agg(first('c3').as('c3'))
.withColumn("c2", F.regexp_replace(F.regexp_replace(F.array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
.groupBy("c1", "cnt_orig").agg(F.collect_list("c2").as('map_category_room_date'))
您应该得到以下结果
+---+--------+----------------------+
|c1 |cnt_orig|map_category_room_date|
+---+--------+----------------------+
|A |4 |[b : 09:00, c : 22:00]|
|b |1 |[c : 09:00] |
+---+--------+----------------------+
Scala方式
工作代码可获取所需的代码Scala中的输出是
working code to get the desired output in scala is
val windowSpec = Window.partitionBy("c1")
df.withColumn("cnt_orig", count("c1").over(windowSpec)).orderBy("c3").groupBy("c1", "c2", "cnt_orig").agg(first("c3").as("c3"))
.withColumn("c2", regexp_replace(regexp_replace(array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
.groupBy("c1", "cnt_orig").agg(collect_list("c2").as("map_category_room_date"))
这篇关于Pyspark Spark DataFrame-聚合和过滤地图类型列中的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!