如何在数据集中存储自定义对象? [英] How to store custom objects in Dataset?

查看:69
本文介绍了如何在数据集中存储自定义对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据 Spark数据集:

当我们期待Spark 2.0时,我们计划对数据集进行一些激动人心的改进,特别是: ... 自定义编码器–虽然我们目前可以自动生成多种类型的编码器,但我们想为自定义对象打开一个API.

As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate encoders for a wide variety of types, we’d like to open up an API for custom objects.

并尝试将自定义类型存储在Dataset中会导致以下错误,例如:

and attempts to store custom type in a Dataset lead to following error like:

无法找到数据集中存储的类型的编码器.导入sqlContext.implicits支持基本类型(Int,String等)和产品类型(案例类)._在将来的版本中将添加对序列化其他类型的支持

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

或:

Java.lang.UnsupportedOperationException:没有为....找到编码器.

Java.lang.UnsupportedOperationException: No Encoder found for ....

是否有任何现有的解决方法?

Are there any existing workarounds?

请注意,此问题仅作为社区Wiki回答的切入点存在.随时更新/改善问题和答案.

Note this question exists only as an entry point for a Community Wiki answer. Feel free to update / improve both question and answer.

推荐答案

更新

尽管从2.2/2.3起现在情况有所改善,但此答案仍然有效且有用,它增加了对SetSeqMapDateTimestampBigDecimal.如果您坚持只使用case类和通常的Scala类型来创建类型,那么只使用SQLImplicits中的隐式就可以了.

Update

This answer is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. If you stick to making types with only case classes and the usual Scala types, you should be fine with just the implicit in SQLImplicits.

不幸的是,几乎没有任何东西可以帮助您.在@since 2.0.0 #L270-L316"rel =" noreferrer> Encoders.scala

Unfortunately, virtually nothing has been added to help with this. Searching for @since 2.0.0 in Encoders.scala or SQLImplicits.scala finds things mostly to do with primitive types (and some tweaking of case classes). So, first thing to say: there currently is no real good support for custom class encoders. With that out of the way, what follows is some tricks which do as good a job as we can ever hope to, given what we currently have at our disposal. As an upfront disclaimer: this won't work perfectly and I'll do my best to make all limitations clear and upfront.

要创建数据集时,Spark需要一个编码器(将类型T的JVM对象与内部Spark SQL表示形式相互转换),该编码器通常是通过SparkSession的隐式自动创建的,或者可以是通过在Encoders上调用静态方法显式创建的"(取自

When you want to make a dataset, Spark "requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is generally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders" (taken from the docs on createDataset). An encoder will take the form Encoder[T] where T is the type you are encoding. The first suggestion is to add import spark.implicits._ (which gives you these implicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using this set of encoder related functions.

常规类没有可用的编码器,所以

There is no encoder available for regular classes, so

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

将为您提供以下隐式相关的编译时错误:

will give you the following implicit related compile time error:

无法找到数据集中存储的类型的编码器.导入sqlContext.implicits支持基本类型(Int,String等)和产品类型(案例类)._在将来的版本中将添加对序列化其他类型的支持

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

但是,如果在扩展Product的某个类中包装了用于获取上述错误的任何类型,则该错误会令人困惑地延迟到运行时,因此

However, if you wrap whatever type you just used to get the above error in some class that extends Product, the error confusingly gets delayed to runtime, so

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译正常,但运行时失败,

Compiles just fine, but fails at runtime with

java.lang.UnsupportedOperationException:未找到MyObj的编码器

java.lang.UnsupportedOperationException: No Encoder found for MyObj

这样做的原因是,Spark使用隐式创建的编码器实际上仅在运行时(通过scala relfection)制成.在这种情况下,所有Spark在编译时的检查都是最外层的类扩展了Product(所有情况下的类都这样做),并且仅在运行时才意识到它仍然不知道如何处理MyObj(相同的问题如果我尝试制作Dataset[(Int,MyObj)],则会发生-Spark等待,直到运行时在MyObj上发声.这些是亟待解决的核心问题:

The reason for this is that the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product (which all case classes do), and only realizes at runtime that it still doesn't know what to do with MyObj (the same problem occurs if I tried to make a Dataset[(Int,MyObj)] - Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:

  • 一些扩展Product的类尽管在运行时总是崩溃,但仍会编译,并且
  • 无法传递用于嵌套类型的自定义编码器(我无法仅为MyObj提供Spark编码器,因此它知道如何编码Wrap[MyObj](Int,MyObj)).
  • some classes that extend Product compile despite always crashing at runtime and
  • there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

每个人都建议的解决方案是使用

The solution everyone suggests is to use the kryo encoder.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

但是,这变得非常乏味.尤其是如果您的代码正在处理各种数据集,联接,分组等.您最终会积累大量额外的隐式变量.那么,为什么不做一个隐式的自动完成呢?

This gets pretty tedious fast though. Especially if your code is manipulating all sorts of datasets, joining, grouping etc. You end up racking up a bunch of extra implicits. So, why not just make an implicit that does this all automatically?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,看来我可以做任何我想做的事(下面的示例在spark-shell(其中自动导入spark.implicits._spark-shell中不起作用))

And now, it seems like I can do almost anything I want (the example below won't work in the spark-shell where spark.implicits._ is automatically imported)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或差不多.问题在于,使用kryo会导致Spark只是将数据集中的每一行存储为平面二进制对象.对于mapfilterforeach就足够了,但是对于像join这样的操作,Spark确实需要将它们分成几列.检查d2d3的架构,您会看到只有一个二进制列:

Or almost. The problem is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解决方案

因此,使用Scala中的隐式魔术(更多信息请参见 6.26.3重载分辨率),我可以使自己成为一系列隐式对象,这些隐式对象将至少在元组中表现出色,并且可以与现有的隐式对象一起很好地工作:

Partial solution for tuples

So, using the magic of implicits in Scala (more in 6.26.3 Overloading Resolution), I can make myself a series of implicits that will do as good a job as possible, at least for tuples, and will work well with existing implicits:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

然后,有了这些隐式内容,尽管可以进行一些列重命名,但我还是可以在上面的示例中工作

Then, armed with these implicits, I can make my example above work, albeit with some column renaming

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何在不重命名的情况下默认获取期望的元组名称(_1_2,...)-如果其他人想使用它,是引入名称"value"

I haven't yet figured out how to get the expected tuple names (_1, _2, ...) by default without renaming them - if someone else wants to play around with this, this is where the name "value" gets introduced and this is where the tuple names are usually added. However, the key point is that that I now have a nice structured schema:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

因此,总的来说,此解决方法:

So, in summary, this workaround:

  • 允许我们为元组获取单独的列(这样我们就可以再次加入元组了,是的!)
  • 我们可以再次依靠隐式函数(这样就不必在整个地方传递kryo了)
  • 几乎完全与import spark.implicits._向后兼容(涉及一些重命名)
  • 不让我们加入kyro序列化的二进制列,更不用说那些可能具有的字段了
  • 具有将一些元组列重命名为值"的令人不快的副作用(如果需要,可以通过转换.toDF,指定新的列名并转换回数据集和模式名来撤消此操作)似乎是通过最需要它们的连接来保留的.)
  • allows us to get separate columns for tuples (so we can join on tuples again, yay!)
  • we can again just rely on the implicits (so no need to be passing in kryo all over the place)
  • is almost entirely backwards compatible with import spark.implicits._ (with some renaming involved)
  • does not let us join on the kyro serialized binary columns, let alone on fields those may have
  • has the unpleasant side-effect of renaming some of the tuple columns to "value" (if necessary, this can be undone by converting .toDF, specifying new column names, and converting back to a dataset - and the schema names seem to be preserved through joins, where they are most needed).

这是令人不愉快的,并且没有好的解决方案.但是,既然我们有了上面的元组解决方案,我就预感到了另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组.然后,在创建数据集之后,您可能会使用数据框方法来重命名列.如果一切顺利,这是真正的一项改进,因为我现在可以在类的字段上执行联接.如果我只使用一个平面二进制kryo序列化程序,那是不可能的.

This one is less pleasant and has no good solution. However, now that we have the tuple solution above, I have a hunch the implicit conversion solution from another answer will be a bit less painful too since you can convert your more complex classes to tuples. Then, after creating the dataset, you'd probably rename the columns using the dataframe approach. If all goes well, this is really an improvement since I can now perform joins on the fields of my classes. If I had just used one flat binary kryo serializer that wouldn't have been possible.

这是一个执行所有操作的示例:我有一个类MyObj,它具有类型为Intjava.util.UUIDSet[String]的字段.首先照顾自己.第二,尽管我可以使用kryo进行序列化,如果将其存储为String,则将更加有用(因为通常我想加入UUID).第三个实际上只是属于一个二进制列.

Here is an example that does a bit of everything: I have a class MyObj which has fields of types Int, java.util.UUID, and Set[String]. The first takes care of itself. The second, although I could serialize using kryo would be more useful if stored as a String (since UUIDs are usually something I'll want to join against). The third really just belongs in a binary column.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用以下机制创建具有良好架构的数据集:

Now, I can create a dataset with a nice schema using this machinery:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

该模式向我显示了具有正确名称的列以及我可以加入的前两个内容.

And the schema shows me I columns with the right names and with the first two both things I can join against.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

这篇关于如何在数据集中存储自定义对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆