分解(转置?)Spark SQL 表中的多列 [英] Explode (transpose?) multiple columns in Spark SQL table
问题描述
我正在使用 Spark SQL(我提到它在 Spark 中,以防影响 SQL 语法 - 我还不够熟悉,无法确定)并且我有一个我正在尝试重新构建的表,但是我在尝试同时转置多个列时遇到困难.
I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure, but I'm getting stuck trying to transpose multiple columns at the same time.
基本上我的数据看起来像:
Basically I have data that looks like:
userId someString varA varB
1 "example1" [0,2,5] [1,2,9]
2 "example2" [1,20,5] [9,null,6]
并且我想同时分解 varA 和 varB(长度将始终保持一致) - 以便最终输出如下所示:
and I'd like to explode both varA and varB simultaneously (the length will always be consistent) - so that the final output looks like this:
userId someString varA varB
1 "example1" 0 1
1 "example1" 2 2
1 "example1" 5 9
2 "example2" 1 9
2 "example2" 20 null
2 "example2" 5 6
但我似乎只能得到一个单一的爆炸(var)语句在一个命令中工作,如果我尝试链接它们(即在第一个爆炸命令之后创建一个临时表)那么我显然得到了大量的重复的、不必要的行.
but I can only seem to get a single explode(var) statement to work in one command, and if I try to chain them (ie create a temp table after the first explode command) then I obviously get a huge number of duplicate, unnecessary rows.
非常感谢!
推荐答案
Spark >= 2.4
您可以跳过zip
udf
并使用arrays_zip
函数:
You can skip zip
udf
and use arrays_zip
function:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show
火花<2.4
如果没有自定义 UDF,您想要的东西是不可能的.在 Scala 中,您可以执行以下操作:
What you want is not possible without a custom UDF. In Scala you could do something like this:
val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))
val df = spark.read.json(data)
df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)
现在我们可以定义zip
udf:
Now we can define zip
udf:
import org.apache.spark.sql.functions.{udf, explode}
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+
使用原始 SQL:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
这篇关于分解(转置?)Spark SQL 表中的多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!