如何拼花数据转换为case类火花? [英] How to convert parquet data to case classes with spark?

查看:260
本文介绍了如何拼花数据转换为case类火花?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经有我已经在火花用来保存数据,实木复合地板,例如case类的负载:

 案例类人(用户名:字符串,
              technographic:选项[Technographic] =无,
              地理:选项[地理] =无)案例类Technographic(浏览器:序号[浏览器],
                     设备:序列[设备]
                     OSS:序号[OS])案例类浏览器(系列:字符串,
               专业:选项[字符串] =无,
               语言:字符串...

我怎样才能将数据转换磁盘上回这些案例课?

我需要能够选择多列,爆炸它们从而使每个列表(例如:浏览器)所有的子表都具有相同的长度。

例如。鉴于这种原始数据:

 人员(用户ID =1234,
  technographic =一些(Technographic(浏览器= SEQ(
    浏览器(家庭=一些(IE),主要=部分(7),语言=有些(EN))
    浏览器(家庭=无,主要=无,语言=有些(EN-US)),
    浏览器(家庭=一些(火狐),主要=无,语言=无)
  ))
  地域=一些(地理(...))

我需要的,例如为浏览器的数据为如下(以及作为能够选择的所有列):

 家庭= IE浏览器,主要= 7,语言= EN
家庭=无,主要=无,语言= EN-US
家庭=火狐,主要=无,语言=无

这要是火星就能引起爆炸每个项目我能得到。目前,它只是像做(反正爆炸将不会与多列工作):

  browsers.family =IE,火狐]
browsers.major = [7]
browsers.language =EN,EN-US]

那么,如何我怎么能重建一个用户的记录使用火花1.5.2所有这些嵌套的可选数据(case类的整组所产生的数据的一排)?

一个可行的方法是:

  VAL DF = sqlContext.read.parquet(inputPath)
df.registerTempTable(人)
VAL栏= df.select(DESC的人)
df.select(选择的人*)地图{X =>
  ... //不知何故拉链`fields`的价值观,这样我就可以
      //列名而不是索引访问值
      //(这是脆),但如何?
}


解决方案

由于

 案例类浏览器(系列:字符串,
                   专业:选项[INT] =无,
                   语言:字符串)案例类科技(浏览器:序号[浏览器],
                设备:序列[字符串]
                OSS:序号[字符串])
案例类人(用户名:字符串,
                  高科技:选项[科技] =无,
                  地理:选项[字符串] =无)

和一些便利类型/为 org.apache.spark.sql.Row

功能

  A型[E] = collection.mutable.WrappedArray [E]隐类RichRow(VAL R:行){
  高清getopt的[T](N:字符串):选项[T] = {
    如果(isNullAt(正)){
      没有
    }其他{
      有些(r.getAs [T](N))
    }
  }  高清getStringOpt(N:字符串)= getopt的[字符串](N)
  高清的getString(N:字符串)= getStringOpt(N)不用彷徨  高清getIntOpt(N:字符串)= getopt的[INT](N)
  高清调用getInt(N:字符串)= r.getIntOpt(N)不用彷徨  高清的getArray [T](N:字符串)= r.getAs [A [T](N)  高清的getRow(N:字符串)= r.getAs [行](N)
  高清GetRows的(N:字符串)= r.getAs [A [行](N)  高清isNullAt(N:字符串)= r.isNullAt(r.fieldIndex(N))
}

然后解析可以在一些功能上进行组织:

 高清toBrowser(R:行):浏览器= {
  浏览器(
    r.getString(家庭),
    r.getIntOpt(重大),
    r.getString(语言))
}高清toBrowsers(行:A [行]):序号[浏览器] = {
  rows.map(toBrowser)
}高清toTech(R:行):技术= {
  高科(
    toBrowsers(r.getRows(浏览器)),
    r.getArray [字符串](设备),
    r.getArray [字符串](OSS))
}高清toTechOpt(R:行):选项[科技] = {
  选项​​(R).MAP(toTech)
}高清TOPERSON(R:行):人= {
  人(
    r.getString(用户id),
    toTechOpt(r.getRow(技术)),
    r.getStringOpt(地理))
}

所以你可以写

  df.map(TOPERSON).collect()的foreach(的println)



  • 我已经组织解析功能为独立的方法。我通常把这些无论是作为适用进入案例类或隐含的价值类的同伴对象为好。这样做的原因职能是,这是比较容易粘贴到火花壳


  • 每个解析函数处理纯柱和直接数组,但是委托给另一个函数,当它遇到一个集合( SEQ 选项 - 这些再present下一嵌套级)


  • 隐类延长AnyVal ,但这又不能粘贴到火花壳


I've have a load of case classes which I've used in spark to save data as parquet, e.g.:

case class Person(userId: String,
              technographic: Option[Technographic] = None,
              geographic: Option[Geographic] = None)

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

case class Browser(family: String,
               major: Option[String] = None, 
               language: String

...

How can I convert the data on disk back to these case classes?

I need to be able to select multiple columns and explode them so that the for each list (e.g. browsers) all of the sub lists have the same lengths.

E.g. Given this original data:

Person(userId="1234",
  technographic=Some(Technographic(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en")),
    Browser(family=None, major=None, language=Some("en-us")),
    Browser(family=Some("Firefox), major=None, language=None)
  )),
  geographic=Some(Geographic(...))
)

I need, e.g. for the browser data to be as follows (as well as being able to select all columns):

family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None

which I could get if spark could explode each list item. Currently it will just do something like (and anyway explode won't work with multiple columns):

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]

So how how can I reconstruct a user's record (the entire set of case classes that produced a row of data) from all this nested optional data using spark 1.5.2?

One possible approach is:

val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x => 
  ... // somehow zip `fields` with the values so that I can 
      // access values by column name instead of index 
      // (which is brittle), but how?
}

解决方案

Given

case class Browser(family: String,
                   major: Option[Int] = None,
                   language: String)

case class Tech(browsers: Seq[Browser],
                devices: Seq[String],
                oss: Seq[String])


case class Person(userId: String,
                  tech: Option[Tech] = None,
                  geographic: Option[String] = None)

and some convenience types/functions for org.apache.spark.sql.Row

type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
  def getOpt[T](n: String): Option[T] = {
    if (isNullAt(n)) {
      None
    } else {
      Some(r.getAs[T](n))
    }
  }

  def getStringOpt(n: String) = getOpt[String](n)
  def getString(n: String) = getStringOpt(n).get

  def getIntOpt(n: String) = getOpt[Int](n)
  def getInt(n: String) = r.getIntOpt(n).get

  def getArray[T](n: String) = r.getAs[A[T]](n)

  def getRow(n: String) = r.getAs[Row](n)
  def getRows(n: String) = r.getAs[A[Row]](n)

  def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}

then parsing can be organized in some functions:

def toBrowser(r: Row): Browser = {
  Browser(
    r.getString("family"),
    r.getIntOpt("major"),
    r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
  rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
  Tech(
    toBrowsers(r.getRows("browsers")),
    r.getArray[String]("devices"),
    r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
  Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
  Person(
    r.getString("userId"),
    toTechOpt(r.getRow("tech")),
    r.getStringOpt("geographic"))
}

so you can write

df.map(toPerson).collect().foreach(println)


  • I have organized the parse functions into "stand-alone" methods. I'd normally put these either as apply into the companion object of the case class or as implicit values classes for Row as well. The reason for functions is that this is easier to paste into the spark-shell

  • Each parse function handles plain columns and arrays directly, but delegates to another function when it encounters a collection (Seq and Option - these represent the next nesting level)

  • The implict class should extend AnyVal, but again this cannot be pasted into the spark-shell

这篇关于如何拼花数据转换为case类火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆