序列化RDD [英] Serializing RDD

查看:780
本文介绍了序列化RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我试图序列化,然后通过反序列化重构的RDD。我想看看这是否可能在Apache中的火花。

I have an RDD which I am trying to serialize and then reconstruct by deserializing. I am trying to see if this is possible in Apache Spark.

     static JavaSparkContext sc = new JavaSparkContext(conf);
        static SerializerInstance si = SparkEnv.get().closureSerializer().newInstance();
    static ClassTag<JavaRDD<String>> tag = scala.reflect.ClassTag$.MODULE$.apply(JavaRDD.class);
..
..
            JavaRDD<String> rdd = sc.textFile(logFile, 4);
            System.out.println("Element 1 " + rdd.first());
            ByteBuffer bb= si.serialize(rdd, tag);
            JavaRDD<String> rdd2 = si.deserialize(bb, Thread.currentThread().getContextClassLoader(),tag);
            System.out.println(rdd2.partitions().size());
            System.out.println("Element 0 " + rdd2.first());

我得到的最后一行例外,当我执行新创建RDD的动作。我序列化的方式类似于它是如何在星火内部完成。

I get an exception on the last line when I perform an action on the newly created RDD. The way I am serializing is similar to how it is done internally in Spark.

Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
    at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1177)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
    at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:477)
    at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
    at SimpleApp.sparkSend(SimpleApp.java:63)
    at SimpleApp.main(SimpleApp.java:91)

创建的RDD和相同的进程中加载​​的,所以我不明白,这个错误是如何发生的。

The RDD is created and loaded within the same process, so I don't understand how this error happens.

推荐答案

我此警告消息的作者。

星火不支持在通过反序列化创建RDDS的副本执行操作和转换。 RDDS是可序列化等他们的某些方法可以在遗嘱执行人被调用,但最终用户不应该尝试手动执行RDD序列化。

Spark does not support performing actions and transformations on copies of RDDs that are created via deserialization. RDDs are serializable so that certain methods on them can be invoked in executors, but end users shouldn't try to manually perform RDD serialization.

当一个RDD序列化,它就失去了参考创建它的SparkContext,从它被发射preventing作业(见<一href=\"https://github.com/apache/spark/blob/a38774b8892a85184520078a2187e9ce2a190038/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L79\">here).在早期版本的星火,你的code将导致NullPointerException异常时,星火试图进入的是私人的,空 RDD.sc 字段。

When an RDD is serialized, it loses its reference to the SparkContext that created it, preventing jobs from being launched with it (see here). In earlier versions of Spark, your code would result in a NullPointerException when Spark tried to access the private, null RDD.sc field.

此错误信息措辞这种方式,因为用户频繁地遇到了令人困惑的NullPointerException异常试图做这样的事情 rdd1.map {_ =&GT的时候; rdd2.count()} ,这造成对机器执行人RDDS反序列化要调用的操作。我没有预料到会有人尝试手动序列化/反序列化对驾驶员的RDDS,这样我就可以看到此错误消息怎么可能有些误导。

This error message was worded this way because users were frequently running into confusing NullPointerExceptions when trying to do things like rdd1.map { _ => rdd2.count() }, which caused actions to be invoked on deserialized RDDs on executor machines. I didn't anticipate that anyone would try to manually serialize / deserialize their RDDs on the driver, so I can see how this error message could be slightly misleading.

这篇关于序列化RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆