如何将 Dataframe 列名称与 Scala 案例类属性匹配? [英] How to match Dataframe column names to Scala case class attributes?
问题描述
本例中来自 spark-sql 的列名来自 case 类 Person
.
The column names in this example from spark-sql come from the case class Person
.
case class Person(name: String, age: Int)
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
https://spark.apache.org/docs/1.1.0/sql-programming-guide.html
但是在许多情况下,参数名称可能会更改.如果文件尚未更新以反映更改,这将导致找不到列.
However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.
如何指定合适的映射?
我在想:
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val ps: Seq[Person] = ???
val personRDD = sc.parallelize(ps)
// Apply the schema to the RDD.
val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
推荐答案
基本上,您需要做的所有映射都可以通过 DataFrame.select(...)
来实现.(在这里,我假设不需要进行类型转换.)给定作为地图的前向和后向映射,本质部分是
Basically, all the mapping you need to do can be achieved with DataFrame.select(...)
. (Here, I assume, that no type conversions need to be done.)
Given the forward- and backward-mapping as maps, the essential part is
val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe
val mappedDF = personsDF.select( mapping: _* )
其中映射是带有别名的 Column
数组.
where mapping is an array of Column
s with alias.
object Example {
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
case class Person(name: String, age: Int)
object Mapping {
val from = Map("name" -> "a", "age" -> "b")
val to = Map("a" -> "name", "b" -> "age")
}
def main(args: Array[String]) : Unit = {
// init
val conf = new SparkConf()
.setAppName( "Example." )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// create persons
val persons = Seq(Person("bob", 35), Person("alice", 27))
val personsRDD = sc.parallelize(persons, 4)
val personsDF = personsRDD.toDF
writeParquet( personsDF, "persons.parquet", sc, sqlContext)
val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
}
def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
import Mapping.from
val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
val mappedDF = personsDF.select( mapping: _* )
mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
}
def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
import Mapping.to
val df = sqlContext.read.parquet(path) // this df has columns a and b
val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
df.select( mapping: _* )
}
}
备注
如果您需要将数据帧转换回 RDD[Person],则
Remark
If you need to convert a dataframe back to an RDD[Person], then
val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row =>
Person( r.getAs("person"), r.getAs("age") )
}
替代方案
也看看 如何将 spark SchemaRDD 转换为我的案例类的 RDD?
这篇关于如何将 Dataframe 列名称与 Scala 案例类属性匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!