如何让Spark使用Kryo序列化对象? [英] How to let Spark serialize an object using Kryo?

查看:122
本文介绍了如何让Spark使用Kryo序列化对象?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将对象从驱动程序节点传递到RDD所在的其他节点,以便RDD的每个分区都可以访问该对象,如以下代码片段所示.

I'd like to pass an object from the driver node to other nodes where an RDD resides, so that each partition of the RDD can access that object, as shown in the following snippet.

object HelloSpark {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                .setAppName("Testing HelloSpark")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrator", "xt.HelloKryoRegistrator")

        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(1 to 20, 4)
        val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))

        rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
            .collect()
            .foreach(println)

        sc.stop
    }
}

// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) = {
        kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
    }
}

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
        output.writeInt(obj.getLength)
        output.writeInt(obj.getOffset)
        output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
    }

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
        val length = input.readInt()
        val offset = input.readInt()
        val bytes  = new Array[Byte](length)
        input.read(bytes, offset, length)

        new ImmutableBytesWritable(bytes)
    }
}

在上面的代码段中,我尝试序列化Spark中Kryo的 ImmutableBytesWritable ,所以我做了以下工作:

In the snippet above, I tried to serialize ImmutableBytesWritable by Kryo in Spark, so I did the follwing:

  1. 配置传递到spark上下文的 SparkConf 实例,即将" spark.serializer "设置为" org.apache.spark.serializer.KryoSerializer ",并将" spark.kryo.registrator "设置为" xt.HelloKryoRegistrator ";
  2. 编写一个自定义的Kryo注册器类,在其中注册类 ImmutableBytesWritable ;
  3. ImmutableBytesWritable
  4. 写一个序列化器
  1. configure the SparkConf instance passed to spark context, i.e., set "spark.serializer" to "org.apache.spark.serializer.KryoSerializer" and set "spark.kryo.registrator" to "xt.HelloKryoRegistrator";
  2. Write a custom Kryo registrator class in which I register the class ImmutableBytesWritable;
  3. Write a serializer for ImmutableBytesWritable

但是,当我以yarn-client模式提交我的Spark应用程序时,抛出了以下异常:

However, when I submit my Spark application in yarn-client mode, the following exception was thrown:

线程"main"中的异常org.apache.spark.SparkException:任务不可序列化在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166)在org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158)在org.apache.spark.SparkContext.clean(SparkContext.scala:1242)在org.apache.spark.rdd.RDD.map(RDD.scala:270)在xt.HelloSpark $ .main(HelloSpark.scala:23)在xt.HelloSpark.main(HelloSpark.scala)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)处在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:606)在org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:325)在org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:75)在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引起原因:java.io.NotSerializableException:org.apache.hadoop.hbase.io.ImmutableBytesWritable在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)在java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)在java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)在org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)处在org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:164)...另外12个

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at xt.HelloSpark$.main(HelloSpark.scala:23) at xt.HelloSpark.main(HelloSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 12 more

似乎Kryo无法序列化 ImmutableBytesWritable .那么让Spark使用Kryo序列化对象的正确方法是什么?Kryo可以序列化任何类型吗?

It seems that ImmutableBytesWritable can't be serialized by Kryo. So what is the correct way to let Spark serialize an object using Kryo? Can Kryo serialize any type?

推荐答案

之所以发生这种情况,是因为您在闭包中使用了 ImmutableBytesWritable .Spark尚不支持使用Kryo进行闭包序列化(仅支持RDD中的对象).您可以借助此方法来解决您的问题:

This is happening because you're using ImmutableBytesWritable in your closure. Spark doesn't support closure serialization with Kryo yet (only objects in RDDs). You can take the help of this to solve your problem:

Spark-任务无法序列化:如何使用在外部类/对象之外调用的复杂映射关闭?

您只需要在通过闭包之前对对象进行序列化,然后再进行反序列化即可.即使您的课程不是可序列化的,这种方法也行得通,因为它在后台使用了Kryo.您所需要的只是一些咖喱.;)

You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;)

这是一个示例草图:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _
rdd.flatMap(mapper).collectAsMap()

object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

这篇关于如何让Spark使用Kryo序列化对象?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆