如何在Spark 2.X数据集中创建自定义编码器? [英] How to create a custom Encoder in Spark 2.X Datasets?

查看:663
本文介绍了如何在Spark 2.X数据集中创建自定义编码器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于Pojo / primitive,Spark数据集将从Row的移动到 Encoder Catalyst 引擎使用 ExpressionEncoder 转换SQL表达式中的列。但是,似乎没有可以使用 Encoder 的其他子类作为我们自己的实现的模板。



以下是Spark 1.X / DataFrames中不能在新版本中编译的代码的示例:

  //将每行映射到RDD元组
df.map(row => {
var id:String = if(!has_id)else row.getAs [String](id)
var label:String = row.getAs [String](label)
val channels:Int = if (!has_channels)0 else row.getAs [Int](channels)
val height:Int = if(!has_height)0 else row.getAs [Int](height)
val width :Int = if(!has_width)0 else row.getAs [Int](width)
val data:Array [Byte] = row.getAs [Any](data)match {
case str:String => str.getBytes
case arr:Array [Byte @ unchecked] => arr
case _ => {
log.error(不支持值类型 )
null
}
}
(id,label,channels,height,width,data)
})。persist(StorageLevel.DISK_ONLY)

}



我们得到一个编译器错误

 错误:(56,11)无法找到存储在数据集中的类型的编码器。 
通过导入spark.implicits._支持基本类型(Int,String等)和产品类型(案例类)
。将来的版本中将添加对其他类型的序列化的支持。
df.map(row => {
^

/某个地方应该有一个方法来




  • 定义/实现我们的自定义编码器

  • DataFrame (现在是一个类型为$ code> Row 的数据集)之间执行映射

  • 注册编码器以供其他自定义代码使用



我正在寻找成功执行这些步骤的代码。

解决方案

据我所知,从1.6开始,没有什么改变,而如何在Spark 1.6中的数据集中存储自定义对象是唯一可用的选项,但是您的当前代码应该适用于产品类型的默认编码器。



为了获得一些洞察力,为什么你的代码在1.x中工作,可能在2.0.0中不起作用,你必须检查签名1.x DataFrame.map 是一种方法,它采用函数 Row =>将 RDD [Row] 转换为 RDD [T]



在2.0.0中 DataFrame.map 采用类型为 Row =>也可以将 Dataset [Row] (aka DataFrame )转换为数据集[T] 因此 T 需要一个 Encoder 。如果你想获得旧行为,你应该使用 RDD 显式地:

  df.rdd.map(row => ???)


Spark Datasets move away from Row's to Encoder's for Pojo's/primitives. The Catalyst engine uses an ExpressionEncoder to convert columns in a SQL expression. However there do not appear to be other subclasses of Encoder available to use as a template for our own implementations.

Here is an example of code that is happy in Spark 1.X / DataFrames that does not compile in the new regime:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

We get a compiler error of

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

So then somehow/somewhere there should be a means to

  • Define/implement our custom Encoder
  • Apply it when performing a mapping on the DataFrame (which is now a Dataset of type Row)
  • Register the Encoder for use by other custom code

I am looking for code that successfully performs these steps.

解决方案

As far as I am aware nothing really changed since 1.6 and the solutions described in How to store custom objects in a Dataset in Spark 1.6 are the only available options. Nevertheless your current code should work just fine with default encoders for product types.

To get some insight why your code worked in 1.x and may not work in 2.0.0 you'll have to check the signatures. In 1.x DataFrame.map is a method which takes function Row => T and transforms RDD[Row] into RDD[T].

In 2.0.0 DataFrame.map takes a function of type Row => T as well, but transforms Dataset[Row] (a.k.a DataFrame) into Dataset[T] hence T requires an Encoder. If you want to get the "old" behavior you should use RDD explicitly:

df.rdd.map(row => ???)

这篇关于如何在Spark 2.X数据集中创建自定义编码器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆