序列化/反序列化 spark sql 数据帧的现有类 [英] Serialize/Deserialize existing class for spark sql dataframe

查看:26
本文介绍了序列化/反序列化 spark sql 数据帧的现有类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 spark 1.6.0假设我有一个这样的课程

Using spark 1.6.0 Say i have a class like this

case class MyClass(date: java.util.Date, oid: org.bson.types.ObjectId)

如果我有

//rdd: RDD[MyClass]
rdd.toDF("date", "oid")

我得到 java.lang.UnsupportedOperationException:不支持类型 java.util.Date/org.bson.types.ObjectId 的架构

现在我知道我可以把它变成一个 java.sql.Date 但是假设 MyClass 在太多其他地方被依赖而无法在任何地方进行改变,那仍然不会解决 ObjectId 问题.

now i know i can make it a java.sql.Date but let's say MyClass is depended upon in too many other places to make that change everywhere, that still won't solve the ObjectId problem.

我也知道 UserDefinedType 选项.但这似乎只有在您还创建一个新类来使用它时才有效(而且,MyClass 的签名需要保持不变)

i am also aware of the UserDefinedType option. But it seems like that only works if you also create a new class to work with it (and again, signature of MyClass needs to stay the same)

有没有办法只为 java.util.Dateorg.bson.types.ObjectId 注册序列化器/反序列化器,以便我可以调用 toDFRDD[MyClass] 上,它会正常工作吗?

is there not a way to just register a serializer/deserializer for java.util.Date and org.bson.types.ObjectId so that i can call toDF on the RDD[MyClass] and it will just work?

UPDATE

所以这并不能完全回答我的问题,但它解除了我们的封锁,所以将在这里分享,希望对其他人有所帮助.所以大多数 json 库都支持这个用例,并且 spark-sql 有一个内置的 sqlContext.read.json(stringRdd).write.parquet("/path/to/output").所以你可以使用你选择的 json 库为类定义(de)ser,序列化为字符串,然后 spark-sql 可以处理其余的

so this doesn't exactly answer my question, but it unblocked us, so will share here in the hope that it's helpful for someone else. so most of the json libraries do support this use case, and spark-sql has a built-in sqlContext.read.json(stringRdd).write.parquet("/path/to/output"). so you can just define the (de)ser for the class using your json lib of choice, serialize to string, then spark-sql can handle the rest

推荐答案

这取决于您所说的只是工作是什么意思.要序列化/反序列化一个对象,您只需要一个相应的 UserDefinedType 和适当的注释.例如这样的事情:

It depends on what you mean by just work. To serialize / deserialize an object all you need is a corresponding UserDefinedType and proper annotations. For example something like this:

@SQLUserDefinedType(udt = classOf[MyClassUDT])
case class MyClass(date: java.util.Date, oid: ObjectId)

class MyClassUDT extends UserDefinedType[MyClass] {
  override def sqlType: StructType = StructType(Seq(
    StructField("date", DateType, nullable = false),
    StructField("oid", StringType, nullable = false)
  ))

  override def serialize(obj: Any): InternalRow = {
    obj match {
      case MyClass(date, oid) =>
        val row = new GenericMutableRow(2)
        row(0) = new java.sql.Date(date.getTime)
        row(1) = UTF8String.fromString(oid.toString)
        row
    }
  }

  override def deserialize(datum: Any): MyClass = {
    datum match {
      case row: InternalRow =>
        val date: java.util.Date = new java.util.Date(
          row.get(0, DateType).asInstanceOf[java.sql.Date].getTime()
        )
        val oid = new ObjectId(row.getString(1))
        MyClass(date, oid)
    }
  }

  override def userClass: Class[MyClass] = classOf[MyClass]
}

这并不意味着您将能够访问在类或其任何字段上定义的任何方法.为了能够做到这一点,您需要 UDF.

It doesn't mean that you'll be able to access any method defined on a class or any of its fields. To be able to do that you'll need UDFs.

更接近无缝集成的是 Spark 数据集,但 AFAIK 尚无法定义自定义编码器.

A little bit closer to seamless integration are Spark Datasets but AFAIK it is not possible to define custom encoders yet.

这篇关于序列化/反序列化 spark sql 数据帧的现有类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆