如何在Spark DataFrame、Scala中将行转换为列 [英] how to convert rows into columns in spark dataframe, scala

查看:12
本文介绍了如何在Spark DataFrame、Scala中将行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法将数据帧行转换成列。 我有以下结构作为输入:

val inputDF = Seq(("pid1","enc1", "bat"),
                  ("pid1","enc2", ""),
                  ("pid1","enc3", ""),
                  ("pid3","enc1", "cat"),
                  ("pid3","enc2", "")
              ).toDF("MemberID", "EncounterID", "entry" )

inputDF.show:

+--------+-----------+-----+
|MemberID|EncounterID|entry|
+--------+-----------+-----+
|    pid1|       enc1|  bat|
|    pid1|       enc2|     |
|    pid1|       enc3|     |
|    pid3|       enc1|  cat|
|    pid3|       enc2|     |
+--------+-----------+-----+

expected result:

+--------+----------+----------+----------+-----+
|MemberID|Encounter1|Encounter2|Encounter3|entry|
+--------+----------+----------+----------+-----+
|    pid1|      enc1|      enc2|      enc3|  bat|
|    pid3|      enc1|      enc2|      null|  cat|
+--------+----------+----------+----------+-----+

请建议是否有任何优化的直接API可用于将行转置为列。 我的输入数据量相当大,所以像收集这样的操作,我将无法执行,因为它将占用驱动程序上的所有数据。 我正在使用Spark 2.x

推荐答案

我不确定您需要的是您实际要求的内容。然而,为了以防万一,这里有一个想法:

val entries = inputDF.where('entry isNotNull)
    .where('entry !== "")
    .select("MemberID", "entry").distinct

val df = inputDF.groupBy("MemberID")
    .agg(collect_list("EncounterID") as "encounterList")
    .join(entries, Seq("MemberID"))
df.show
+--------+-------------------------+-----+
|MemberID|           encounterList |entry|
+--------+-------------------------+-----+
|    pid1|       [enc2, enc1, enc3]|  bat|
|    pid3|             [enc2, enc1]|  cat|
+--------+-------------------------+-----+

列表的顺序不是确定性的,但您可以对其进行排序,然后使用.withColumn("Encounter1", sort_array($"encounterList")(0))...

从中提取新列

其他想法

如果您想要的是将条目的值放入相应的"遭遇"列,您可以使用透视:

inputDF
    .groupBy("MemberID")
    .pivot("EncounterID", Seq("enc1", "enc2", "enc3"))
    .agg(first("entry")).show

+--------+----+----+----+
|MemberID|enc1|enc2|enc3|
+--------+----+----+----+
|    pid1| bat|    |    |
|    pid3| cat|    |    |
+--------+----+----+----+

添加Seq("enc1", "enc2", "enc3")是可选操作,但由于您知道该列的内容,因此可以加快计算速度。

这篇关于如何在Spark DataFrame、Scala中将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆