如何火花SchemaRDD转换成我的案子类RDD? [英] How to convert spark SchemaRDD into RDD of my case class?
问题描述
在火花的文档很清楚如何从 RDD
创建拼花文件自己的case类; (从文档)
In the spark docs it's clear how to create parquet files from RDD
of your own case classes; (from the docs)
val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
但尚不清楚如何转换回来,真是我们想要的方法 readParquetFile
在这里我们可以做的:
val people: RDD[Person] = sc.readParquestFile[Person](path)
,其中被定义的情况下类的那些值是那些由方法读
where those values of the case class are defined are those which are read by the method.
推荐答案
最好的解决方案,我想出了一个需要复制和粘贴新课程的最低金额如下(我还是想看到另一个解决方案虽然)
The best solution I've come up with that requires the least amount of copy and pasting for new classes is as follows (I'd still like to see another solution though)
首先,你必须确定你的情况类,(部分)可重复使用的工厂方法
First you have to define your case class, and a (partially) reusable factory method
import org.apache.spark.sql.catalyst.expressions
case class MyClass(fooBar: Long, fred: Long)
// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T =
fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}
一些锅炉板,这将已经有
Some boiler plate which will already be available
import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
魔术
import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD
def camelToUnderscores(name: String) =
"[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())
def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)
def caseClassToSQLCols[T: TypeTag]: List[String] =
getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)
def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
val tmpName = "tmpTableName" // Maybe should use a random string
schemaRDD.registerAsTable(tmpName)
sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
.map(fac)
}
使用示例
val parquetFile = sqlContext.parquetFile(path)
val normalRDD: RDD[MyClass] =
schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))
另请参阅:
<一个href=\"http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html\">http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html
虽然我没能找到遵循JIRA链接的例子或文件。
Though I failed to find any example or documentation by following the JIRA link.
这篇关于如何火花SchemaRDD转换成我的案子类RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!