如何在数据集中存储自定义对象? [英] How to store custom objects in Dataset?

查看:29
本文介绍了如何在数据集中存储自定义对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据介绍Spark数据集:

当我们期待 Spark 2.0 时,我们计划对数据集进行一些激动人心的改进,特别是:...自定义编码器 - 虽然我们目前为各种类型自动生成编码器,但我们希望为自定义对象开放 API.

As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate encoders for a wide variety of types, we’d like to open up an API for custom objects.

并尝试将自定义类型存储在 Dataset 中会导致以下错误,例如:

and attempts to store custom type in a Dataset lead to following error like:

无法找到存储在数据集中的类型的编码器.通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(case 类).未来版本将添加对序列化其他类型的支持

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

或:

Java.lang.UnsupportedOperationException: 找不到用于 .... 的编码器

Java.lang.UnsupportedOperationException: No Encoder found for ....

是否有任何现有的解决方法?

Are there any existing workarounds?

请注意,此问题仅作为社区 Wiki 答案的入口点存在.随时更新/改进问题和答案.

Note this question exists only as an entry point for a Community Wiki answer. Feel free to update / improve both question and answer.

推荐答案

更新

这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况有所改善,它增加了对 SetSeq 的内置编码器支持地图DateTimestampBigDecimal.如果您坚持只使用 case 类和通常的 Scala 类型来创建类型,那么只使用 SQLImplicits 中的隐式应该没问题.

Update

This answer is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. If you stick to making types with only case classes and the usual Scala types, you should be fine with just the implicit in SQLImplicits.

不幸的是,几乎没有添加任何东西来帮助解决这个问题.在 Encoders.scalaSQLImplicits.scala 发现大部分与原始类型有关(以及对 case 类的一些调整).所以,首先要说的是:目前没有对自定义类编码器的真正好的支持.考虑到我们目前拥有的东西,接下来是一些技巧,它们可以像我们希望的那样出色地完成工作.作为预先免责声明:这不会完美地工作,我会尽我最大的努力明确和预先说明所有限制.

Unfortunately, virtually nothing has been added to help with this. Searching for @since 2.0.0 in Encoders.scala or SQLImplicits.scala finds things mostly to do with primitive types (and some tweaking of case classes). So, first thing to say: there currently is no real good support for custom class encoders. With that out of the way, what follows is some tricks which do as good a job as we can ever hope to, given what we currently have at our disposal. As an upfront disclaimer: this won't work perfectly and I'll do my best to make all limitations clear and upfront.

当你想要创建一个数据集时,Spark需要一个编码器(将类型 T 的 JVM 对象与内部 Spark SQL 表示相互转换),它通常通过 SparkSession,或者可以通过在 Encoders 上调用静态方法来显式创建"(取自 createDataset 上的文档).编码器将采用 Encoder[T] 的形式,其中 T 是您要编码的类型.第一个建议是添加 import spark.implicits._(它给你 these 隐式编码器),第二个建议是使用 这个 编码器相关功能集.

When you want to make a dataset, Spark "requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders" (taken from the docs on createDataset). An encoder will take the form Encoder[T] where T is the type you are encoding. The first suggestion is to add import spark.implicits._ (which gives you these implicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using this set of encoder related functions.

常规课程没有可用的编码器,所以

There is no encoder available for regular classes, so

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

会给你以下隐含的相关编译时错误:

will give you the following implicit related compile time error:

无法找到存储在数据集中的类型的编码器.通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(case 类).未来版本将添加对序列化其他类型的支持

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

然而,如果你在扩展 Product 的某个类中包装你刚刚用来得到上述错误的任何类型,错误会令人困惑地延迟到运行时,所以

However, if you wrap whatever type you just used to get the above error in some class that extends Product, the error confusingly gets delayed to runtime, so

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译正常,但在运行时失败

Compiles just fine, but fails at runtime with

java.lang.UnsupportedOperationException:未找到 MyObj 的编码器

java.lang.UnsupportedOperationException: No Encoder found for MyObj

这样做的原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection).在这种情况下,Spark 在编译时的所有检查都是最外层的类扩展了 Product(所有 case 类都会这样做),并且只有在运行时才意识到它仍然不知道如何处理 MyObj(如果我尝试创建一个 Dataset[(Int,MyObj)],也会出现同样的问题——Spark 会等到运行时在 MyObj 上阻塞).这些是急需解决的核心问题:

The reason for this is that the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product (which all case classes do), and only realizes at runtime that it still doesn't know what to do with MyObj (the same problem occurs if I tried to make a Dataset[(Int,MyObj)] - Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:

  • 一些扩展 Product 的类尽管总是在运行时崩溃,但仍然可以编译
  • 无法为嵌套类型传入自定义编码器(我无法仅为 MyObj 提供给 Spark 编码器,以便它知道如何编码 Wrap[MyObj](Int,MyObj)).
  • some classes that extend Product compile despite always crashing at runtime and
  • there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

大家建议的解决方案是使用 kryo 编码器.

The solution everyone suggests is to use the kryo encoder.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

不过这很快就会变得很乏味.尤其是如果您的代码正在处理各种数据集、连接、分组等.您最终会积累一堆额外的隐式.那么,为什么不直接隐含地自动完成这一切?

This gets pretty tedious fast though. Especially if your code is manipulating all sorts of datasets, joining, grouping etc. You end up racking up a bunch of extra implicits. So, why not just make an implicit that does this all automatically?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,似乎我几乎可以做任何我想做的事情(下面的示例在 spark-shell 中不起作用,其中 spark.implicits._ 是自动导入)

And now, it seems like I can do almost anything I want (the example below won't work in the spark-shell where spark.implicits._ is automatically imported)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者差不多.问题是使用 kryo 导致 Spark 只是将数据集中的每一行存储为一个平面二进制对象.对于mapfilterforeach就够了,但是对于join这样的操作,Spark真的需要这些分成几列.检查 d2d3 的架构,您会看到只有一个二进制列:

Or almost. The problem is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解决方案

因此,在 Scala 中使用隐式的魔力(更多信息请参见 6.26.3 重载解析),我可以为自己制作一系列的隐式,它们会尽可能地做好工作,至少对于元组,并且可以很好地与现有的隐式配合使用:

Partial solution for tuples

So, using the magic of implicits in Scala (more in 6.26.3 Overloading Resolution), I can make myself a series of implicits that will do as good a job as possible, at least for tuples, and will work well with existing implicits:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

然后,有了这些隐含的东西,我就可以让我上面的例子工作了,尽管有一些列重命名

Then, armed with these implicits, I can make my example above work, albeit with some column renaming

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何在不重命名的情况下默认获取预期的元组名称(_1_2、...) - 如果其他人想要的话要解决这个问题,this 是引入名称 "value"this 是通常添加元组名称的地方.然而,关键是我现在有一个很好的结构化架构:

I haven't yet figured out how to get the expected tuple names (_1, _2, ...) by default without renaming them - if someone else wants to play around with this, this is where the name "value" gets introduced and this is where the tuple names are usually added. However, the key point is that that I now have a nice structured schema:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

总之,这个解决方法:

  • 允许我们为元组获取单独的列(所以我们可以再次加入元组,耶!)
  • 我们可以再次只依赖于隐式(所以不需要到处传递 kryo)
  • 几乎完全向后兼容 import spark.implicits._(涉及一些重命名)
  • 是否让我们加入kyro序列化的二进制列,更不用说那些可能具有的字段
  • 具有将某些元组列重命名为值"的令人不快的副作用(如有必要,可以通过转换 .toDF、指定新列名并转换回数据集 - 并且模式名称似乎是通过联接保留的,这是最需要它们的地方).
  • allows us to get separate columns for tuples (so we can join on tuples again, yay!)
  • we can again just rely on the implicits (so no need to be passing in kryo all over the place)
  • is almost entirely backwards compatible with import spark.implicits._ (with some renaming involved)
  • does not let us join on the kyro serialized binary columns, let alone on fields those may have
  • has the unpleasant side-effect of renaming some of the tuple columns to "value" (if necessary, this can be undone by converting .toDF, specifying new column names, and converting back to a dataset - and the schema names seem to be preserved through joins, where they are most needed).

这个不太愉快,也没有好的解决方案.但是,现在我们有了上面的元组解决方案,我预感来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组.然后,在创建数据集后,您可能会使用数据框方法重命名列.如果一切顺利,这真的是一个改进,因为我现在可以在我的类的字段上执行连接.如果我只使用一个平面二进制 kryo 序列化程序,那是不可能的.

This one is less pleasant and has no good solution. However, now that we have the tuple solution above, I have a hunch the implicit conversion solution from another answer will be a bit less painful too since you can convert your more complex classes to tuples. Then, after creating the dataset, you'd probably rename the columns using the dataframe approach. If all goes well, this is really an improvement since I can now perform joins on the fields of my classes. If I had just used one flat binary kryo serializer that wouldn't have been possible.

这是一个可以完成所有事情的示例:我有一个 MyObj 类,它具有 Intjava.util.UUIDSet[String].第一个照顾自己.第二个,虽然我可以使用 kryo 进行序列化,如果存储为 String 会更有用(因为 UUIDs 通常是我想要的加入反对).第三个实际上只属于一个二进制列.

Here is an example that does a bit of everything: I have a class MyObj which has fields of types Int, java.util.UUID, and Set[String]. The first takes care of itself. The second, although I could serialize using kryo would be more useful if stored as a String (since UUIDs are usually something I'll want to join against). The third really just belongs in a binary column.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用这种机制创建一个具有良好架构的数据集:

Now, I can create a dataset with a nice schema using this machinery:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

模式向我显示了具有正确名称的 I 列以及我可以加入的前两项.

And the schema shows me I columns with the right names and with the first two both things I can join against.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

这篇关于如何在数据集中存储自定义对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆