Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包? [英] Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?

查看:201
本文介绍了Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

看看这个问题: Scala + Spark - 任务不可序列化:java.io.NotSerializableExceptionon。仅在类而不是对象上调用函数外部闭包时

问题:

假设我的映射器可以是函数(def),它在内部调用其他类并创建对象并在其中执行不同的操作。 (或者它们甚至可以是扩展(Foo)=> Bar的类并在其apply方法中进行处理 - 但是现在让我们忽略这种情况)

Suppose my mappers can be functions (def) that internally call other classes and create objects and do different things inside. (Or they can even be classes that extend (Foo) => Bar and do the processing in their apply method - but let'ś ignore this case for now)

Spark仅支持用于闭包的Java序列化。有没有办法解决这个问题?我们可以用东西而不是封闭来做我想做的事吗?我们可以使用Hadoop轻松完成这类工作。这一点让Spark几乎无法使用。我们不能指望所有第三方库都将所有类扩展为Serializable!

Spark supports only Java Serialization for closures. Is there ANY way out of this? Can we use something instead of closures to do what I want to do? We can easily do this sort of stuff with Hadoop. This single thing is making Spark almost unusable for me. One cannot expect all 3rd party libraries to have all classes extend Serializable!

可能的解决方案:

这样的事情似乎有用: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Does something like this seem to be of any use: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

这看起来好像是一个包装器,但我看不出具体如何。

It certainly seems like a wrapper is the answer, but I cannot see exactly how.

推荐答案

我自己想出了怎么做!

你只需要在通过闭包之前序列化对象,然后反序列化。即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo。你需要的只是一些咖喱。 ;)

You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;)

这是我如何做到的一个例子:

Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

随意使Blah变得如你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用。

Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs.

KryoSerializationWrapper参考: https: //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

KryoSerializationWrapper referes to: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

这篇关于Spark - 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆