了解Spark的闭包及其序列化 [英] Understanding Spark's closures and their serialization

查看:91
本文介绍了了解Spark的闭包及其序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

免责声明:刚刚开始使用Spark.

Disclaimer: just starting to play with Spark.

我很难理解著名的无法序列化的任务"异常,但是我的问题与我在SO上看到的(或我认为的)有些不同.

I'm having troubles understanding the famous "Task not serializable" exception but my question is a little different from those I see on SO (or so I think).

我有一个很小的自定义RDD(TestRDD).它具有一个字段,该字段存储其类未实现Serializable(NonSerializable)的对象.我已将"spark.serializer"配置选项设置为使用Kryo.但是,当我在RDD上尝试count()时,得到以下信息:

I have a tiny custom RDD (TestRDD). It has a field which stores objects whose class does not implement Serializable (NonSerializable). I've set the "spark.serializer" config option to use Kryo. However, when I try count() on my RDD, I get the following:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

当我查看DAGScheduler.submitMissingTasks时,我发现它在我的RDD上使用了它的 closure 序列化程序,它是Java序列化程序,而不是我期望的Kryo序列化程序.我读过Kryo在序列化闭包方面遇到问题,Spark始终将Java序列化程序用于闭包,但是我完全不了解闭包在这里是如何发挥作用的.我在这里所做的就是这样:

When I look inside DAGScheduler.submitMissingTasks I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect. I've read that Kryo has issues serializing closures and Spark always uses the Java serializer for closures but I don't quite understand how closures come into play here at all. All I'm doing here is this:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

也就是说,没有映射器或任何需要闭包序列化的内容. OTOH可行:

That is, no mappers or anything which would require serialization of closures. OTOH this works:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

按预期方式使用Kryo序列化器,不涉及闭包序列化器.如果我没有将serializer属性设置为Kryo,我也会在这里遇到异常.

The Kryo serializer is used as expected, the closure serializer is not involved. If I didn't set the serializer property to Kryo, I'd get an exception here as well.

我非常感谢有任何指针解释闭包的来源以及如何确保我可以使用Kryo序列化自定义RDD.

I appreciate any pointers explaining where the closure comes from and how to ensure that I can use Kryo to serialize custom RDDs.

更新:这是TestRDD,具有不可序列化的字段mNS:

UPDATE: here's TestRDD with its non-serializable field mNS:

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}

推荐答案

当我查看DAGScheduler.submitMissingTasks时,我发现它使用了 它在我的RDD上的闭包序列化程序,它是Java序列化程序,而不是 我期望的Kryo序列化程序.

When I look inside DAGScheduler.submitMissingTasks I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect.

SparkEnv支持两个序列化器,一个名为serializer的序列化器,用于序列化数据,检查点,工作人员之间的消息传递等,并在spark.serializer配置标志下可用.另一个在spark.closure.serializer下称为closureSerializer,用于检查您的对象是否可序列化,是否可配置为Spark< = 1.6.2(但JavaSerializer除外),并且从2.0开始进行了硬编码. 0及以上的JavaSerializer.

SparkEnv supports two serializers, one named serializer which is used for serialization of your data, checkpointing, messaging between workers, etc and is available under spark.serializer configuration flag. The other is called closureSerializer under spark.closure.serializer which is used to check that your object is in fact serializable and is configurable for Spark <= 1.6.2 (but nothing other than JavaSerializer actually works) and hardcoded from 2.0.0 and above to JavaSerializer.

Kryo闭包序列化程序有一个错误使其无法使用,您可以在下看到该错误. SPARK-7708 (这可能在Kryo 3.0.0中已修复,但Spark目前已在Khillo 2.2.1中已修复的特定Chill版本中得到修复).此外,对于Spark 2.0.x,现在已修复JavaSerializer,而不是对其进行配置(您可以在此下拉列表中看到它)请求).这意味着有效地,我们只用JavaSerializer进行闭包序列化.

The Kryo closure serializer has a bug which make it unusable, you can see that bug under SPARK-7708 (this may be fixed with Kryo 3.0.0, but Spark is currently fixed with a specific version of Chill which is fixed on Kryo 2.2.1). Further, for Spark 2.0.x the JavaSerializer is now fixed instead of configurable (you can see it in this pull request). This mean that effectively we're stuck with the JavaSerializer for closure serialization.

我们使用一个序列化器来提交任务,而使用另一个序列化工作器之间的数据,这很奇怪吗?当然,但这就是我们所拥有的.

Is this weird that we're using one serializer to submit tasks and other to serialize data between workers and such? definitely, but this is what we have.

总而言之,如果要设置spark.serializer配置或使用SparkContext.registerKryoClasses,则将在Spark中使用Kryo进行大多数序列化.话虽如此,为了检查给定的类是否可可序列化并将任务序列化为工作程序,Spark将使用JavaSerializer.

To sum up, if you're setting the spark.serializer configuration, or using SparkContext.registerKryoClasses you'll be utilizing Kryo for most of your serialization in Spark. Having said that, for checking if a given class is serializable and serialization of tasks to workers, Spark will use JavaSerializer.

这篇关于了解Spark的闭包及其序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆