序列化/反序列化现有的类SQL火花数据框 [英] Serialize/Deserialize existing class for spark sql dataframe
问题描述
使用火花1.6.0
说我有一个这样的类
Using spark 1.6.0 Say i have a class like this
case class MyClass(date: java.util.Date, oid: org.bson.types.ObjectId)
如果我有
//rdd: RDD[MyClass]
rdd.toDF("date", "oid")
我得到 java.lang.UnsupportedOperationException:不支持架构类型java.util.Date/org.bson.types.ObjectId
现在我知道我可以把它 java.sql.Date
但是我们要说 MyClass的
对中依赖太多的其他地方进行改变无处不在,仍然不能解决的ObjectId
的问题。
now i know i can make it a java.sql.Date
but let's say MyClass
is depended upon in too many other places to make that change everywhere, that still won't solve the ObjectId
problem.
我也知道了 UserDefinedType
选项。但看起来,如果你还可以创建一个新的类与它的工作,只有工作(并再次,签名 MyClass的
需要保持不变)
i am also aware of the UserDefinedType
option. But it seems like that only works if you also create a new class to work with it (and again, signature of MyClass
needs to stay the same)
是不是有只注册 java.util.Date
和 org.bson.types.ObjectId 串行器/解串器的方式code>,这样我可以叫
toDF
在 RDD [MyClass的]
,这将只是工作?
is there not a way to just register a serializer/deserializer for java.util.Date
and org.bson.types.ObjectId
so that i can call toDF
on the RDD[MyClass]
and it will just work?
UPDATE
所以这并不完全回答我的问题,但我们的畅通,因此将分享在这里,希望这对别人有帮助的。所以大部分的JSON库都支持这种使用情况,并引发-SQL具有内置的 sqlContext.read.json(stringRdd).write.parquet(/路径/要/输出)
。所以你可以定义使用你选择的JSON lib中的类(DE)SER,序列化到字符串,则引发-SQL可以处理剩下的
so this doesn't exactly answer my question, but it unblocked us, so will share here in the hope that it's helpful for someone else. so most of the json libraries do support this use case, and spark-sql has a built-in sqlContext.read.json(stringRdd).write.parquet("/path/to/output")
. so you can just define the (de)ser for the class using your json lib of choice, serialize to string, then spark-sql can handle the rest
推荐答案
这取决于你的意思是什么样的只是工作的。序列化/反序列化对象的所有你需要的是一个对应的 UserDefinedType
和恰当的注解。例如这样的事情:
It depends on what you mean by just work. To serialize / deserialize an object all you need is a corresponding UserDefinedType
and proper annotations. For example something like this:
@SQLUserDefinedType(udt = classOf[MyClassUDT])
case class MyClass(date: java.util.Date, oid: ObjectId)
class MyClassUDT extends UserDefinedType[MyClass] {
override def sqlType: StructType = StructType(Seq(
StructField("date", DateType, nullable = false),
StructField("oid", StringType, nullable = false)
))
override def serialize(obj: Any): InternalRow = {
obj match {
case MyClass(date, oid) =>
val row = new GenericMutableRow(2)
row(0) = new java.sql.Date(date.getTime)
row(1) = UTF8String.fromString(oid.toString)
row
}
}
override def deserialize(datum: Any): MyClass = {
datum match {
case row: InternalRow =>
val date: java.util.Date = new java.util.Date(
row.get(0, DateType).asInstanceOf[java.sql.Date].getTime()
)
val oid = new ObjectId(row.getString(1))
MyClass(date, oid)
}
}
override def userClass: Class[MyClass] = classOf[MyClass]
}
这并不意味着你就可以访问类或任何字段定义的任何方法。为了能够做到这一点,你需要的UDF。
It doesn't mean that you'll be able to access any method defined on a class or any of its fields. To be able to do that you'll need UDFs.
一点点接近无缝集成是星火的数据集,但据我所知这是不可能的EN codeRS自定义呢。
A little bit closer to seamless integration are Spark Datasets but AFAIK it is not possible to define custom encoders yet.
这篇关于序列化/反序列化现有的类SQL火花数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!