如何火花SchemaRDD转换成我的案子类RDD? [英] How to convert spark SchemaRDD into RDD of my case class?

查看:241
本文介绍了如何火花SchemaRDD转换成我的案子类RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在火花的文档很清楚如何从 RDD 创建拼花文件自己的case类; (从文档)

In the spark docs it's clear how to create parquet files from RDD of your own case classes; (from the docs)

val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

但尚不清楚如何转换回来,真是我们想要的方法 readParquetFile 在这里我们可以做的:

val people: RDD[Person] = sc.readParquestFile[Person](path)

,其中被定义的情况下类的那些值是那些由方法读

where those values of the case class are defined are those which are read by the method.

推荐答案

最好的解决方案,我想出了一个需要复制和粘贴新课程的最低金额如下(我还是想看到另一个解决方案虽然)

The best solution I've come up with that requires the least amount of copy and pasting for new classes is as follows (I'd still like to see another solution though)

首先,你必须确定你的情况类,(部分)可重复使用的工厂方法

First you have to define your case class, and a (partially) reusable factory method

import org.apache.spark.sql.catalyst.expressions

case class MyClass(fooBar: Long, fred: Long)

// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
  def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = 
    fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}

一些锅炉板,这将已经有

Some boiler plate which will already be available

import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

魔术

import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD

def camelToUnderscores(name: String) = 
  "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
  case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)

def caseClassToSQLCols[T: TypeTag]: List[String] = 
  getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
  val tmpName = "tmpTableName" // Maybe should use a random string
  schemaRDD.registerAsTable(tmpName)
  sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
  .map(fac)
}

使用示例

val parquetFile = sqlContext.parquetFile(path)

val normalRDD: RDD[MyClass] = 
  schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))

另请参阅:

<一个href=\"http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html\">http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html

虽然我没能找到遵循JIRA链接的例子或文件。

Though I failed to find any example or documentation by following the JIRA link.

这篇关于如何火花SchemaRDD转换成我的案子类RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆