Spark 数据框将多行转换为列 [英] Spark dataframe transform multiple rows to column

查看:34
本文介绍了Spark 数据框将多行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一个新手,我想在源数据帧下面转换(从 JSON 文件加载):

I am a novice to spark, and I want to transform below source dataframe (load from JSON file):

+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|
+--+-----+-----+

进入结果数据框:

+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+

这是转换规则:

  1. 结果数据框由 A + (n major columns) 组成,其中 major 列名称由:

  1. The result dataframe is consisted with A + (n major columns) where the major columns names are specified by:

sorted(src_df.map(lambda x: x[2]).distinct().collect())

  • 结果数据帧包含 m 行,其中 A 列的值由:

  • The result dataframe contains m rows where the values for A column are provided by:

    sorted(src_df.map(lambda x: x[0]).distinct().collect())
    

  • 结果数据帧中每个主要列的值是来自相应A 和主要数据帧的源数据帧的值(例如,源数据帧中第 1 行的计数映射到 box,其中 Aa 和列 m1)

  • The value for each major column in result dataframe is the value from source dataframe on the corresponding A and major (e.g. the count in Row 1 in source dataframe is mapped to the box where A is a and column m1)

    源数据帧中Amajor的组合没有重复(请把它当作SQL中两列的主键)

    The combinations of A and major in source dataframe has no duplication (please consider it a primary key on the two columns in SQL)

    推荐答案

    让我们从示例数据开始:

    Lets start with example data:

    df = sqlContext.createDataFrame([
        ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
        ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
        ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
        ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
        ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
        ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
        ("e", 1, "m4"), ("e", 1, "m5")], 
        ("a", "cnt", "major"))
    

    请注意,我已将 count 更改为 cnt.Count 是大多数 SQL 方言中的保留关键字,它不是列名的好选择.

    Please note that I've changed count to cnt. Count is a reserved keyword in most of the SQL dialects and it is not a good choice for a column name.

    至少有两种方法可以重塑这些数据:

    There are at least two ways to reshape this data:

    • 在 DataFrame 上聚合

    • aggregating over DataFrame

    from pyspark.sql.functions import col, when, max
    
    majors = sorted(df.select("major")
        .distinct()
        .map(lambda row: row[0])
        .collect())
    
    cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) 
        for m in  majors]
    maxs = [max(col(m)).alias(m) for m in majors]
    
    reshaped1 = (df
        .select(col("a"), *cols)
        .groupBy("a")
        .agg(*maxs)
        .na.fill(0))
    
    reshaped1.show()
    
    ## +---+---+---+---+---+---+
    ## |  a| m1| m2| m3| m4| m5|
    ## +---+---+---+---+---+---+
    ## |  a|  1|  1|  2|  3|  0|
    ## |  b|  4|  1|  2|  0|  0|
    ## |  c|  3|  0|  4|  5|  0|
    ## |  d|  6|  1|  2|  3|  4|
    ## |  e|  4|  5|  1|  1|  1|
    ## +---+---+---+---+---+---+
    

  • groupBy RDD

    from pyspark.sql import Row
    
    grouped = (df
        .map(lambda row: (row.a, (row.major, row.cnt)))
        .groupByKey())
    
    def make_row(kv):
        k, vs = kv
        tmp = dict(list(vs) + [("a", k)])
        return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})
    
    reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))
    
    reshaped2.show()
    
    ## +---+---+---+---+---+---+
    ## |  a| m1| m2| m3| m4| m5|
    ## +---+---+---+---+---+---+
    ## |  a|  1|  1|  2|  3|  0|
    ## |  e|  4|  5|  1|  1|  1|
    ## |  c|  3|  0|  4|  5|  0|
    ## |  b|  4|  1|  2|  0|  0|
    ## |  d|  6|  1|  2|  3|  4|
    ## +---+---+---+---+---+---+
    

  • 这篇关于Spark 数据框将多行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆