Pyspark Spark DataFrame-聚合和过滤地图类型列中的列 [英] Pyspark Spark DataFrame - Aggregate and filter columns in map type column

查看:285
本文介绍了Pyspark Spark DataFrame-聚合和过滤地图类型列中的列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 DataFrame 看起来像:

| c1 | c2|  c3  |
|----+---+-------
| A  | b | 22:00| 
| A  | b | 23:00|
| A  | b | 09:00|
| A  | c | 22:00|
| B  | c | 09:30|

我想执行一些聚合并创建第二个 DataFrame 有3列:

I would like to perform some aggregations and create a second DataFrame with 3 columns:

c1 :是我要分组的列。

map_category_room_date :地图类型,键入 c2 并在 c3中设置较低/最小值

map_category_room_date: map type, key the c2 and value the lower/min value in c3.

cnt_orig :是原始组有多少行的计数。

cnt_orig: is the count on how many rows the original group had.

结果

Result

|    c1    |  map_category_room_date | cnt_orig |
|----------+-------------------------+----------|
|   'A'    |{'b': 09:00, 'C': 22:00} |    4     |
|   'B'    |{'c': 09:30}             |    1     |

我可以使用哪些汇总函数来归档,这是最简单的方法?

What aggregate functions can I use to archive this is the most simple way?

谢谢

推荐答案

您可以窗口函数生成 count ,然后使用内置函数通过执行以下操作来获取所需的最终数据框

You can window function to generate the count, then use inbuilt functions to get the final dataframe you desire by doing to following

from pyspark.sql import Window
windowSpec = Window.partitionBy("c1")

from pyspark.sql import functions as F
df.withColumn("cnt_orig", count('c1').over(windowSpec)).orderBy('c3').groupBy("c1", "c2", "cnt_orig").agg(first('c3').as('c3'))
    .withColumn("c2", F.regexp_replace(F.regexp_replace(F.array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
      .groupBy("c1", "cnt_orig").agg(F.collect_list("c2").as('map_category_room_date'))

您应该得到以下结果

+---+--------+----------------------+
|c1 |cnt_orig|map_category_room_date|
+---+--------+----------------------+
|A  |4       |[b : 09:00, c : 22:00]|
|b  |1       |[c : 09:00]           |
+---+--------+----------------------+

Scala方式

工作代码可获取所需的代码Scala中的输出是

working code to get the desired output in scala is

val windowSpec = Window.partitionBy("c1")

df.withColumn("cnt_orig", count("c1").over(windowSpec)).orderBy("c3").groupBy("c1", "c2", "cnt_orig").agg(first("c3").as("c3"))
    .withColumn("c2", regexp_replace(regexp_replace(array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : "))
      .groupBy("c1", "cnt_orig").agg(collect_list("c2").as("map_category_room_date"))

这篇关于Pyspark Spark DataFrame-聚合和过滤地图类型列中的列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆