如何将 Dataframe 列名称与 Scala 案例类属性匹配? [英] How to match Dataframe column names to Scala case class attributes?

查看:33
本文介绍了如何将 Dataframe 列名称与 Scala 案例类属性匹配?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

本例中来自 spark-sql 的列名来自 case 类 Person.

The column names in this example from spark-sql come from the case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

但是在许多情况下,参数名称可能会更改.如果文件尚未更新以反映更改,这将导致找不到列.

However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.

如何指定合适的映射?

我在想:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  // Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)

推荐答案

基本上,您需要做的所有映射都可以通过 DataFrame.select(...) 来实现.(在这里,我假设不需要进行类型转换.)给定作为地图的前向和后向映射,本质部分是

Basically, all the mapping you need to do can be achieved with DataFrame.select(...). (Here, I assume, that no type conversions need to be done.) Given the forward- and backward-mapping as maps, the essential part is

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

其中映射是带有别名的 Column 数组.

where mapping is an array of Columns with alias.

object Example {   

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}

  case class Person(name: String, age: Int)

  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }

  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF

    writeParquet( personsDF, "persons.parquet", sc, sqlContext)

    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }

  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray

    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }

  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b

    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}

备注

如果您需要将数据帧转换回 RDD[Person],则

Remark

If you need to convert a dataframe back to an RDD[Person], then

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

替代方案

也看看 如何将 spark SchemaRDD 转换为我的案例类的 RDD?

这篇关于如何将 Dataframe 列名称与 Scala 案例类属性匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆