Spark Dataframe到Java类的数据集 [英] Spark Dataframe to Dataset of Java class

查看:193
本文介绍了Spark Dataframe到Java类的数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将作为Json读取的Dataframe转换为给定类的Dataset.到目前为止,当我能够编写自己的案例类时,效果很好.

I want to transform a Dataframe that is read in as Json to a Dataset of a given class. So far, that worked pretty well, when I was able to write own case classes.

case class MyCaseClass(...)
val df = spark.read.json("path/to/json")
val ds = df.as[MyCaseClass]

def myFunction(input: MyCaseClass): MyCaseClass = {
    // Do some validation and things
    input
}

ds.map(myFunction)

但是,现在我绑定到外部Java类(特别是由Thrift创建的类).因此,这里有一个带有自定义类的更具体的示例:

However, now I am bound to external Java classes (specifically ones created by thrift). So here a more concrete example with a custom class:

Json:

{"a":1,"b":"1","wrapper":{"inside":"1.1", "map": {"k": "v"}}}
{"a":2,"b":"2","wrapper":{"inside":"2.1", "map": {"k": "v"}}}
{"a":3,"b":"3","wrapper":{"inside":"3.1", "map": {"k": "v"}}}

班级:

class MyInnerClass(var inside: String, var map: Map[String, String]) extends java.io.Serializable {
  def getInside(): String = {inside}
  def setInside(newInside: String) {inside = newInside}
  def getMap(): Map[String, String] = {map}
  def setMap(newMap: Map[String, String]) {map = newMap}
}

class MyClass(var a: Int, var b: String, var wrapper: MyInnerClass)  extends java.io.Serializable {
  def getA(): Int = {a}
  def setA(newA: Int) {a = newA}
  def getB(): String = {b}
  def setB(newB: String) {b = newB}
  def getWrapper(): MyInnerClass = {wrapper}
  def setWrapper(newWrapper: MyInnerClass) {wrapper = newWrapper}
}

所以我想做

val json = spark.read.json("path/to/json")
json.as[MyClass]

但是,抛出:

Unable to find encoder for type stored in a Dataset.  Primitive type (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

因此,我发现了有关自定义编码器的信息:(在此处此处)

So, I found out about Custom Encoders: (here and here)

import org.apache.spark.sql.Encoders
val kryoMyClassEncoder  = Encoders.kryo[MyClass]
json.as[MyClass](kryoMyClassEncoder)

哪个抛出:

Try to map struct<a:bigint,b:string,wrapper:struct<inside:string,map:struct<k:string>>> to Tuple1, but failed as the number of fields does not line up

那么如何将数据框转换为自定义对象数据集.

So how can I convert a Dataframe to a custom object Dataset.

推荐答案

代替使用kryo编码器,请尝试使用乘积编码器,即:

Instead of using kryo encoder, try using the product encoder, ie:

val productMyClassEncoder  = Encoders.product[MyClass]

这篇关于Spark Dataframe到Java类的数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆