Spark如何将关闭发送给工人? [英] How does Spark send closures to workers?

查看:86
本文介绍了Spark如何将关闭发送给工人?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我编写RDD转换时,例如

When I write an RDD transformation, e.g.

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)

我了解到,只是一个Function1的闭包(x => x * 3)需要可序列化,并且我想我在某处读到了 它就在文档中隐含着: http://spark.apache.org/docs/latest/programming-guide. html#passing-functions-spark ,它已发送"给工作人员执行. (例如,阿卡(Akka)通过电线向工人发送可执行代码"以供其运行)

I understand that the closure (x => x * 3) which is simply a Function1 needs to be Serializable and I think I read somewhere it's right there implied in the documentation: http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark that it is "sent" to the workers for execution. (e.g. Akka sending an "executable piece of code" down the wire to workers to run)

它是如何工作的?

我参加的一个聚会上的某人评论说,它实际上并没有发送任何序列化的代码,但是由于每个工作人员无论如何都会获得jar的副本",因此它只需要引用要运行的函数或类似的东西(但我不确定我是否正确引用了该人)

Someone at a meetup I attended commented and said that it is not actually sending any serialized code, but since each worker get a "copy" of the jar anyway, it just needs a reference to which function to run or something like this (but I'm not sure I quote that person correctly)

我现在对它的实际工作方式完全感到困惑.

I'm now at an utter confusion on how it actually works.

所以我的问题是

  1. 如何将转换关闭发送给工人?通过akka序列化了吗?还是他们已经在那里",因为spark将整个uber jar发送给每个工人(听起来我不太可能...)

  1. how are transformation closures sent to workers? Serialized via akka? or they are "already there" because spark sends the entire uber jar to each worker (sounds unlikely to me...)

如果是,那么罐子的其余部分如何发送给工人?这是"cleanupClosure"在做什么?例如仅将相关的字节码发送给工作人员,而不是整个uberjar吗? (例如,仅依赖于闭包的代码?)

if so, then how the rest of the jar is sent to the workers? is this is what the "cleanupClosure" doing? e.g. sending only the relevant bytecode to the worker instead of the entire uberjar? (e.g. only dependent code to the closure?)

总而言之,火花在任何时候是否以某种方式与工人同步--jars类路径中的jar?还是向工作人员发送了适量"的代码?如果确实发送了封包,是否需要重新计算封存它们?还是在每次计划任务时都随任务一起发送闭包?抱歉,这是一个愚蠢的问题,但我真的不知道.

so to summarise, does spark, at any point, syncs the jars in the --jars classpath with the workers somehow? or does it sends "just the right amount" of code to workers? and if it does send closures, are they being cached for the need of recalculation? or does it send the closure with the task every time a task is scheduled? sorry if this is silly questions but I really don't know.

如果可以回答,请添加资源,我在文档中找不到明确的信息,而且我非常谨慎,只能通过阅读代码来尝试得出结论.

Please add sources if you can for your answer, I couldn't find it explicit in the documentation, and I'm too wary to try and conclude it just by reading the code.

推荐答案

闭包肯定是在运行时序列化的.我有很多实例在运行时都出现了Closure Not Serializable异常-来自pyspark和scala.有一个叫做

The closures are most certainly serialized at runtime. I have plenty of instances seen Closure Not Serializable exceptions at runtime - from pyspark and from scala. There is complex code called

来自ClosureCleaner.scala

def clean(
    closure: AnyRef,
    checkSerializable: Boolean = true,
    cleanTransitively: Boolean = true): Unit = {
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

尝试最小化要序列化的代码.然后,将代码通过电线发送-如果可序列化.否则将引发异常.

that attempts to minify the code being serialized. The code is then sent across the wire - if it were serializable. Otherwise an exception will be thrown.

这里是ClosureCleaner的另一个摘录,用于检查序列化传入函数的能力:

Here is another excerpt from ClosureCleaner to check the ability to serialize an incoming function:

  private def ensureSerializable(func: AnyRef) {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

这篇关于Spark如何将关闭发送给工人?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆