如何将Dataframe列名称与Scala案例类属性匹配? [英] How to match Dataframe column names to Scala case class attributes?

查看:189
本文介绍了如何将Dataframe列名称与Scala案例类属性匹配?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在本示例中,来自spark-sql的列名称来自case class Person.

The column names in this example from spark-sql come from the case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

但是,在许多情况下,参数名称可能会更改.如果文件尚未更新以反映更改,则将导致找不到列.

However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.

如何指定适当的映射?

我在想类似的东西

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  // Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)

推荐答案

基本上,您需要执行的所有映射都可以通过DataFrame.select(...)来实现. (在这里,我认为不需要进行任何类型转换.) 鉴于前向和后向映射为地图,必不可少的部分是

Basically, all the mapping you need to do can be achieved with DataFrame.select(...). (Here, I assume, that no type conversions need to be done.) Given the forward- and backward-mapping as maps, the essential part is

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

其中映射是具有别名的Column的数组.

where mapping is an array of Columns with alias.

object Example {   

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}

  case class Person(name: String, age: Int)

  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }

  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF

    writeParquet( personsDF, "persons.parquet", sc, sqlContext)

    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }

  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray

    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }

  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b

    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}

备注

如果您需要将数据帧转换回RDD [Person],则

Remark

If you need to convert a dataframe back to an RDD[Person], then

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

替代项

还可以查看 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆