派生从单个列多列在Spark数据框 [英] Derive multiple columns from a single column in a Spark DataFrame
问题描述
我有一个巨大的解析的元数据在数据框一个字符串列DF,让我们把它称为DFA,与ColmnA。
I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.
我想破此列,ColmnA成多列直通功能,ClassXYZ = func1的(ColmnA)。这个函数返回一个类ClassXYZ,多变量,每个这些变量现在必须映射到新列,这样的ColmnA1,ColmnA2等。
I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.
如何我会通过调用这个FUNC1只有一次做这样的转型,从1数据框到另一个这些附加列,不用重复,它创建的所有列。
How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.
它很容易解决,如果我每增加一个新的列时调用这个巨大作用,但我想避免的。
Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.
请请告知与工作或伪code。
Kindly please advise with a working or pseudo code.
感谢
桑杰
推荐答案
一般来说,你想要的是不能直接。 UDF在当时只返回一个列。有两种不同的方法可以克服这个限制:
Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:
-
返回复杂类型的列。最通用的解决方案是一个
StructType
,但你可以考虑将ArrayType
或地图类型
以及
import org.apache.spark.sql.functions.udf
val df = sc.parallelize(Seq(
(1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
)).toDF("x", "y", "z")
case class Foobar(foo: Double, bar: Double)
val foobarUdf = udf((x: Long, y: Double, z: String) =>
Foobar(x * y, z.head.toInt * y))
val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
df1.show
// +---+----+---+------------+
// | x| y| z| foobar|
// +---+----+---+------------+
// | 1| 3.0| a| [3.0,291.0]|
// | 2|-1.0| b|[-2.0,-98.0]|
// | 3| 0.0| c| [0.0,0.0]|
// +---+----+---+------------+
df1.printSchema
// root
// |-- x: long (nullable = false)
// |-- y: double (nullable = false)
// |-- z: string (nullable = true)
// |-- foobar: struct (nullable = true)
// | |-- foo: double (nullable = false)
// | |-- bar: double (nullable = false)
此以后可以容易地平坦化,但通常没有必要。
This can be easily flattened later but usually there is no need for that.
切换到RDD,重塑和重建DF:
Switch to RDD, reshape and rebuild DF:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
def foobarFunc(x: Long, y: Double, z: String): Seq[Any] =
Seq(x * y, z.head.toInt * y)
val schema = StructType(df.schema.fields ++
Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
val rows = df.rdd.map(r => Row.fromSeq(
r.toSeq ++
foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
val df2 = sqlContext.createDataFrame(rows, schema)
df2.show
// +---+----+---+----+-----+
// | x| y| z| foo| bar|
// +---+----+---+----+-----+
// | 1| 3.0| a| 3.0|291.0|
// | 2|-1.0| b|-2.0|-98.0|
// | 3| 0.0| c| 0.0| 0.0|
// +---+----+---+----+-----+
这篇关于派生从单个列多列在Spark数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!