派生从单个列多列在Spark数据框 [英] Derive multiple columns from a single column in a Spark DataFrame

查看:236
本文介绍了派生从单个列多列在Spark数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个巨大的解析的元数据在数据框一个字符串列DF,让我们把它称为DFA,与ColmnA。

I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

我想破此列,ColmnA成多列直通功能,ClassXYZ = func1的(ColmnA)。这个函数返回一个类ClassXYZ,多变量,每个这些变量现在必须映射到新列,这样的ColmnA1,ColmnA2等。

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

如何我会通过调用这个FUNC1只有一次做这样的转型,从1数据框到另一个这些附加列,不用重复,它创建的所有列。

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

它很容易解决,如果我每增加一个新的列时调用这个巨大作用,但我想避免的。

Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.

请请告知与工作或伪code。

Kindly please advise with a working or pseudo code.

感谢

桑杰

推荐答案

一般来说,你想要的是不能直接。 UDF在当时只返回一个列。有两种不同的方法可以克服这个限制:

Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:


  1. 返回复杂类型的列。最通用的解决方案是一个 StructType ,但你可以考虑将ArrayType 地图类型以及

import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
)).toDF("x", "y", "z")

case class Foobar(foo: Double, bar: Double)

val foobarUdf = udf((x: Long, y: Double, z: String) => 
  Foobar(x * y, z.head.toInt * y))

val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
df1.show
// +---+----+---+------------+
// |  x|   y|  z|      foobar|
// +---+----+---+------------+
// |  1| 3.0|  a| [3.0,291.0]|
// |  2|-1.0|  b|[-2.0,-98.0]|
// |  3| 0.0|  c|   [0.0,0.0]|
// +---+----+---+------------+

df1.printSchema
// root
//  |-- x: long (nullable = false)
//  |-- y: double (nullable = false)
//  |-- z: string (nullable = true)
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: double (nullable = false)
//  |    |-- bar: double (nullable = false)

此以后可以容易地平坦化,但通常没有必要。

This can be easily flattened later but usually there is no need for that.

切换到RDD,重塑和重建DF:

Switch to RDD, reshape and rebuild DF:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
  Seq(x * y, z.head.toInt * y)

val schema = StructType(df.schema.fields ++
  Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))

val rows = df.rdd.map(r => Row.fromSeq(
  r.toSeq ++
  foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))

val df2 = sqlContext.createDataFrame(rows, schema)

df2.show
// +---+----+---+----+-----+
// |  x|   y|  z| foo|  bar|
// +---+----+---+----+-----+
// |  1| 3.0|  a| 3.0|291.0|
// |  2|-1.0|  b|-2.0|-98.0|
// |  3| 0.0|  c| 0.0|  0.0|
// +---+----+---+----+-----+


这篇关于派生从单个列多列在Spark数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆