如何使用 spark 将镶木地板数据转换为案例类? [英] How to convert parquet data to case classes with spark?

查看:33
本文介绍了如何使用 spark 将镶木地板数据转换为案例类?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有很多案例类,我在 spark 中使用它们将数据保存为镶木地板,例如:

I've have a load of case classes which I've used in spark to save data as parquet, e.g.:

case class Person(userId: String,
              technographic: Option[Technographic] = None,
              geographic: Option[Geographic] = None)

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

case class Browser(family: String,
               major: Option[String] = None, 
               language: String

...

如何将磁盘上的数据转换回这些案例类?

How can I convert the data on disk back to these case classes?

我需要能够选择多个列并分解它们,以便每个列表(例如 browsers)的所有子列表都具有相同的长度.

I need to be able to select multiple columns and explode them so that the for each list (e.g. browsers) all of the sub lists have the same lengths.

例如鉴于此原始数据:

Person(userId="1234",
  technographic=Some(Technographic(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en")),
    Browser(family=None, major=None, language=Some("en-us")),
    Browser(family=Some("Firefox), major=None, language=None)
  )),
  geographic=Some(Geographic(...))
)

我需要,例如浏览器数据如下(以及能够选择所有列):

I need, e.g. for the browser data to be as follows (as well as being able to select all columns):

family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None

如果 spark 可以 explode 每个列表项,我就可以得到.目前它只会做类似的事情(无论如何 explode 不能处理多列):

which I could get if spark could explode each list item. Currently it will just do something like (and anyway explode won't work with multiple columns):

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]

那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例类集)?

So how how can I reconstruct a user's record (the entire set of case classes that produced a row of data) from all this nested optional data using spark 1.5.2?

一种可能的方法是:

val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x => 
  ... // somehow zip `fields` with the values so that I can 
      // access values by column name instead of index 
      // (which is brittle), but how?
}

推荐答案

Given

case class Browser(family: String,
                   major: Option[Int] = None,
                   language: String)

case class Tech(browsers: Seq[Browser],
                devices: Seq[String],
                oss: Seq[String])


case class Person(userId: String,
                  tech: Option[Tech] = None,
                  geographic: Option[String] = None)

以及org.apache.spark.sql.Row

type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
  def getOpt[T](n: String): Option[T] = {
    if (isNullAt(n)) {
      None
    } else {
      Some(r.getAs[T](n))
    }
  }

  def getStringOpt(n: String) = getOpt[String](n)
  def getString(n: String) = getStringOpt(n).get

  def getIntOpt(n: String) = getOpt[Int](n)
  def getInt(n: String) = r.getIntOpt(n).get

  def getArray[T](n: String) = r.getAs[A[T]](n)

  def getRow(n: String) = r.getAs[Row](n)
  def getRows(n: String) = r.getAs[A[Row]](n)

  def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}

然后可以在一些函数中组织解析:

then parsing can be organized in some functions:

def toBrowser(r: Row): Browser = {
  Browser(
    r.getString("family"),
    r.getIntOpt("major"),
    r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
  rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
  Tech(
    toBrowsers(r.getRows("browsers")),
    r.getArray[String]("devices"),
    r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
  Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
  Person(
    r.getString("userId"),
    toTechOpt(r.getRow("tech")),
    r.getStringOpt("geographic"))
}

所以你可以写

df.map(toPerson).collect().foreach(println)

<小时>

  • 我已将解析函数组织成独立"方法.我通常会将它们作为 apply 放入 case 类的伴随对象中,或者也作为 Row 的隐式值类.函数的原因是这样更容易粘贴到spark-shell


    • I have organized the parse functions into "stand-alone" methods. I'd normally put these either as apply into the companion object of the case class or as implicit values classes for Row as well. The reason for functions is that this is easier to paste into the spark-shell

      每个解析函数直接处理普通列和数组,但在遇到集合时委托给另一个函数(SeqOption - 这些代表下一个嵌套级)

      Each parse function handles plain columns and arrays directly, but delegates to another function when it encounters a collection (Seq and Option - these represent the next nesting level)

      implict class 应该extend AnyVal,但这又不能粘贴到spark-shell

      The implict class should extend AnyVal, but again this cannot be pasted into the spark-shell

      这篇关于如何使用 spark 将镶木地板数据转换为案例类?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆