在Map [String,java.io.Serializable]中找不到java.io.Serializable的火花 [英] Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable]

查看:244
本文介绍了在Map [String,java.io.Serializable]中找不到java.io.Serializable的火花的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在写一个火花作业,它的数据集非常灵活,定义为Dataset[Map[String, java.io.Serializable]].

I am writing a spark job that the dataset is pretty flexible, it's defined as Dataset[Map[String, java.io.Serializable]].

现在问题开始出现,spark运行时抱怨No Encoder found for java.io.Serializable.我尝试过kyro serde,仍然显示相同的错误消息.

now the problem start to show up, spark runtime complains about No Encoder found for java.io.Serializable. I've tried kyro serde, still showing the same error message.

之所以必须使用这种怪异的数据集类型,是因为我在每行中都有灵活的字段.和地图看起来像:

the reason why I have to use this weird Dataset type is because I have flexible fields per Row. and the map looks like:

Map(
  "a" -> 1,
  "b" -> "bbb",
  "c" -> 0.1,
  ...
)

Spark中是否有任何可处理这种灵活数据集类型的东西?

is there anyway in Spark to handle this flexible dataset type?

这是任何人都可以尝试的可靠代码.

here is the solid code anyone can try.

import org.apache.spark.sql.{Dataset, SparkSession}

object SerdeTest extends App {
  val sparkSession: SparkSession = SparkSession
    .builder()
    .master("local[2]")
    .getOrCreate()


  import sparkSession.implicits._
  val ret: Dataset[Record] = sparkSession.sparkContext.parallelize(0 to 10)
    .map(
      t => {
        val row = (0 to t).map(
          i => i -> i.asInstanceOf[Integer]
        ).toMap

        Record(map = row)
      }
    ).toDS()

  val repartitioned = ret.repartition(10)


  repartitioned.collect.foreach(println)
}

case class Record (
                  map: Map[Int, java.io.Serializable]
                  )

上面的代码将给您错误找不到编码器:

the above code will give you error Encoder not found:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.io.Serializable
- map value class: "java.io.Serializable"
- field (class: "scala.collection.immutable.Map", name: "map")

推荐答案

找到了答案,一种解决方法是使用Kyro serde框架,代码更改非常少,只需要使用Kyro制作一个隐式编码器并带来每当需要序列化时,就将其放入上下文中.

found the answer, one way to solve this is to use Kyro serde framework, code change is very minimum, just need to make an implicit Encoder using Kyro and bring that into the context whenever serialization is needed.

这是我正在使用的代码示例(可以直接在IntelliJ或等效的IDE中运行):

here is the code example I got working(can directly run in IntelliJ or equivalent IDE):

import org.apache.spark.sql._

object SerdeTest extends App {
  val sparkSession: SparkSession = SparkSession
    .builder()
    .master("local[2]")
    .getOrCreate()


  import sparkSession.implicits._

  // here is the place you define your Encoder for your custom object type, like in this case Map[Int, java.io.Serializable]
  implicit val myObjEncoder: Encoder[Record] = org.apache.spark.sql.Encoders.kryo[Record]
  val ret: Dataset[Record] = sparkSession.sparkContext.parallelize(0 to 10)
    .map(
      t => {
        val row = (0 to t).map(
          i => i -> i.asInstanceOf[Integer]
        ).toMap

        Record(map = row)
      }
    ).toDS()

  val repartitioned = ret.repartition(10)


  repartitioned.collect.foreach(
    row => println(row.map)
  )
}

case class Record (
                  map: Map[Int, java.io.Serializable]
                  )

此代码将产生预期的结果:

this code will produce the expected results:

Map(0 -> 0, 5 -> 5, 1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1, 2 -> 2)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 7 -> 7, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1)
Map(0 -> 0, 1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 1 -> 1, 2 -> 2, 3 -> 3)
Map(0 -> 0)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 3 -> 3, 4 -> 4)
Map(0 -> 0, 5 -> 5, 10 -> 10, 1 -> 1, 6 -> 6, 9 -> 9, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 9 -> 9, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)
Map(0 -> 0, 5 -> 5, 1 -> 1, 6 -> 6, 2 -> 2, 7 -> 7, 3 -> 3, 8 -> 8, 4 -> 4)

这篇关于在Map [String,java.io.Serializable]中找不到java.io.Serializable的火花的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆