序列化/反序列化 spark sql 数据帧的现有类 [英] Serialize/Deserialize existing class for spark sql dataframe
问题描述
使用 spark 1.6.0假设我有一个这样的课程
Using spark 1.6.0 Say i have a class like this
case class MyClass(date: java.util.Date, oid: org.bson.types.ObjectId)
如果我有
//rdd: RDD[MyClass]
rdd.toDF("date", "oid")
我得到 java.lang.UnsupportedOperationException:不支持类型 java.util.Date/org.bson.types.ObjectId 的架构
现在我知道我可以把它变成一个 java.sql.Date
但是假设 MyClass
在太多其他地方被依赖而无法在任何地方进行改变,那仍然不会解决 ObjectId
问题.
now i know i can make it a java.sql.Date
but let's say MyClass
is depended upon in too many other places to make that change everywhere, that still won't solve the ObjectId
problem.
我也知道 UserDefinedType
选项.但这似乎只有在您还创建一个新类来使用它时才有效(而且,MyClass
的签名需要保持不变)
i am also aware of the UserDefinedType
option. But it seems like that only works if you also create a new class to work with it (and again, signature of MyClass
needs to stay the same)
有没有办法只为 java.util.Date
和 org.bson.types.ObjectId
注册序列化器/反序列化器,以便我可以调用 toDF
在 RDD[MyClass]
上,它会正常工作吗?
is there not a way to just register a serializer/deserializer for java.util.Date
and org.bson.types.ObjectId
so that i can call toDF
on the RDD[MyClass]
and it will just work?
UPDATE
所以这并不能完全回答我的问题,但它解除了我们的封锁,所以将在这里分享,希望对其他人有所帮助.所以大多数 json 库都支持这个用例,并且 spark-sql 有一个内置的 sqlContext.read.json(stringRdd).write.parquet("/path/to/output")
.所以你可以使用你选择的 json 库为类定义(de)ser,序列化为字符串,然后 spark-sql 可以处理其余的
so this doesn't exactly answer my question, but it unblocked us, so will share here in the hope that it's helpful for someone else. so most of the json libraries do support this use case, and spark-sql has a built-in sqlContext.read.json(stringRdd).write.parquet("/path/to/output")
. so you can just define the (de)ser for the class using your json lib of choice, serialize to string, then spark-sql can handle the rest
推荐答案
这取决于您所说的只是工作是什么意思.要序列化/反序列化一个对象,您只需要一个相应的 UserDefinedType
和适当的注释.例如这样的事情:
It depends on what you mean by just work. To serialize / deserialize an object all you need is a corresponding UserDefinedType
and proper annotations. For example something like this:
@SQLUserDefinedType(udt = classOf[MyClassUDT])
case class MyClass(date: java.util.Date, oid: ObjectId)
class MyClassUDT extends UserDefinedType[MyClass] {
override def sqlType: StructType = StructType(Seq(
StructField("date", DateType, nullable = false),
StructField("oid", StringType, nullable = false)
))
override def serialize(obj: Any): InternalRow = {
obj match {
case MyClass(date, oid) =>
val row = new GenericMutableRow(2)
row(0) = new java.sql.Date(date.getTime)
row(1) = UTF8String.fromString(oid.toString)
row
}
}
override def deserialize(datum: Any): MyClass = {
datum match {
case row: InternalRow =>
val date: java.util.Date = new java.util.Date(
row.get(0, DateType).asInstanceOf[java.sql.Date].getTime()
)
val oid = new ObjectId(row.getString(1))
MyClass(date, oid)
}
}
override def userClass: Class[MyClass] = classOf[MyClass]
}
这并不意味着您将能够访问在类或其任何字段上定义的任何方法.为了能够做到这一点,您需要 UDF.
It doesn't mean that you'll be able to access any method defined on a class or any of its fields. To be able to do that you'll need UDFs.
更接近无缝集成的是 Spark 数据集,但 AFAIK 尚无法定义自定义编码器.
A little bit closer to seamless integration are Spark Datasets but AFAIK it is not possible to define custom encoders yet.
这篇关于序列化/反序列化 spark sql 数据帧的现有类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!