编码后无法对自定义类型进行操作吗?Spark数据集 [英] Impossible to operate on custom type after it is encoded? Spark Dataset

查看:98
本文介绍了编码后无法对自定义类型进行操作吗?Spark数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说你有这个(编码自定义类型的解决方案来自此线程):

Say you have this (solution of encoding custom type is brought from this thread):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

进行 ds.show 时,我得到了:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

我知道这是因为内容被编码为内部Spark SQL二进制表示形式.但是如何显示这样的解码内容?

I understand that it's because the contents are encoded into internal Spark SQL binary representation. But how can I display the decoded content like this?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+


UPDATE1

显示内容不是最大的问题,更重要的是,在处理数据集时可能会导致问题,请考虑以下示例:

Displaying content is not the biggest issue, what's more important is that it could lead to problem when processing the dataset, consider this example:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

这是否意味着 kryo 编码类型不能方便地执行 joinWith 这样的操作?

Does this mean, kryo-encoded type is not able to do operation like joinWith conveniently?

如何处理数据集然后上的自定义类型?
如果我们在编码后无法对其进行处理,那么此 kryo 编码解决方案对自定义类型的意义何在?!

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(下面由@jacek提供的解决方案对于 case class 类型很了解,但仍然无法解码自定义类型)

(Solution provided by @jacek below is good to know for case class type, but it still cannot decode custom type)

推荐答案

以下内容对我有用,但似乎使用高级API进行低级(反序列化)工作.

The following worked for me, but seems like using high-level API to do low-level (deserialization) work.

这并不是说应该这样做,而是表明这是可能的.

This is not to say it should be done this way, but shows that it's possible.

我不知道为什么KryoDeserializer不将字节反序列化为字节来自的对象.就是这样.

I don't know why KryoDeserializer does not deserialize bytes to the object the bytes came from. It is just this way.

您的类定义和我的类之间的主要区别是此 case ,让我使用以下技巧.再说一次,不知道到底为什么能做到这一点.

One major difference between your class definition and mine is this case that let me using the following trick. Again, no idea exactly why it makes it possible.

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

这篇关于编码后无法对自定义类型进行操作吗?Spark数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆