什么是对所有工人静态对象的正确方法 [英] What is the right way to have a static object on all workers

查看:331
本文介绍了什么是对所有工人静态对象的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在寻找的火花文档和它提到了这一点:


  

星火的API在很大程度上依赖于传递功能,在驱动程序
  在集群上运行。有两种推荐的方式来做到这一点:


  
  

匿名函数的语法,它可用于$ C $的c短块。
  静态方法在全球singleton对象。例如,您可以
  定义对象MyFunctions再经过MyFunctions.func1,如下:


 对象MyFunctions {func1的高清(S:字符串):字符串= {...}}myRdd.map(MyFunctions.func1)


  

请注意,虽然它也可以
  在一类实例传递给方法的参考(而不是一个
  单对象),这要求发送包含该对象
  该方法沿着类。例如,考虑:


  MyClass类{
  高清func1的(S:字符串):字符串= {...}
  高清doStuff(RDD:RDD [字符串]):RDD [字符串] = {rdd.map(FUNC1)}
}


  

下面,如果我们创建一个新的MyClass的,并呼吁它doStuff,在地图里面
  引用MyClass的实例的方法func1的,所以整个
  需要被发送到群集对象。它类似于文字
   rdd.map(X => this.func1(X))。


现在我的疑问是,如果你有单独的对象(这应该是等同于静态)属性上会发生什么。用小改动同一个例子:

 对象MyClass的{
  VAL值= 1
  高清func1的(S:字符串):字符串= {S +值}
}myRdd.map(MyClass.func1)

所以功能仍然被引用静态,但多远并星火试图通过序列化所有引用的变量去?它会序列还是会在远程工作人员再次初始化?

此外,这是所有的,我有一个单独的对象里面有些沉重车型,我想找到它们序列工人,同时保持从单引用它们无处不在的能力,正确的方式的背景下,强似他们周围的跨pretty深函数调用堆栈函数的参数。

这是什么/如何/何时星火序列化的东西将是AP preciated任何深入的信息。


解决方案

这是少了约Spark和更多的斯卡拉如何生成code的一个问题的问题。请记住,斯卡拉对象是pretty太大的Java类充满了静态方法。考虑一个简单的例子是这样的:

 对象foo {  VAL值= 42  高清FUNC(我的:int):智力= I +价值  高清主(参数:数组[字符串]):单位= {
    的println(序列(1,2,3).MAP(FUNC)的.sum)
  }}

这将被转换为3 Java类;其中一人将是对地图方法的参数关闭。使用的javap 该类的收益率是这样的:

 公众最终类Foo $$ anonfun $ $主体1增加了scala.runtime.AbstractFunction1 $ MCII $ SP实现scala.Serializable {
  公共静态最后的serialVersionUID长;
  公众最终诠释申请(INT);
  公众诠释申请$ MCII $ SP(INT);
  公众最终的java.lang.Object申请(java.lang.Object)中的;
  公共富$$ anonfun $主$ 1();
}

请注意有没有字段或任何东西。如果你看一下反汇编字节code,它是所有调用 FUNC()方法。当运行星火,这是将被序列实例;因为它没有田,没有太多被序列化。

至于你的问题,如何初始化静态对象,你可以有你拨打你的倒闭开始幂等初始化函数。第一个将触发初始化,后续的调用将是空操作。清理,虽然很多麻烦,因为我不熟悉的,做类似的API上运行的所有执行人这个code。

一种方法,如果你需要清理解释的在这个博客,在设置()和清理()一节。

编辑:刚刚澄清,这里实际上使调用该方法的反汇编

 公众诠释申请$ MCII $ SP(INT);
  code:
   0:getstatic#29; //现场为foo $ .MODULE $:Lfoo $;
   3:iload_1
   4:invokevirtual#32; //方法foo $ .FUNC:(一)我
   7:ireturn

看看它是如何仅仅引用静态字段保持单身,并调用 FUNC()方法。

I've been looking at the documentation for spark and it mentions this:

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

Anonymous function syntax, which can be used for short pieces of code. Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

object MyFunctions {   def func1(s: String): String = { ... } }

myRdd.map(MyFunctions.func1) 

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

class MyClass {   
  def func1(s: String): String = { ... }   
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } 
} 

Here, if we create a new MyClass and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).

Now my doubt is what happens if you have attributes on the singleton object (which are supposed to be equivalent to static). Same example with a small alteration:

object MyClass {   
  val value = 1   
  def func1(s: String): String = { s + value }   
} 

myRdd.map(MyClass.func1) 

So the function is still referenced statically, but how far does Spark goes by trying to serialize all referenced variables? Will it serialize value or will it be initialized again in the remote workers?

Additionally, this is all in the context that I have some heavy models inside a singleton object and I would like to find the correct way to serialize them to workers while keeping the ability to reference them from the singleton everywhere, instead of passing them around as function parameters across a pretty deep function call stack.

Any in-depth information on what/how/when does Spark serialize things would be appreciated.

解决方案

This is less a question about Spark and more of a question of how Scala generates code. Remember that a Scala object is pretty much a Java class full of static methods. Consider a simple example like this:

object foo {

  val value = 42

  def func(i: Int): Int = i + value

  def main(args: Array[String]): Unit = {
    println(Seq(1, 2, 3).map(func).sum)
  }

}

That will be translated to 3 Java classes; one of them will be the closure that is a parameter to the map method. Using javap on that class yields something like this:

public final class foo$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;
  public final int apply(int);
  public int apply$mcII$sp(int);
  public final java.lang.Object apply(java.lang.Object);
  public foo$$anonfun$main$1();
}

Note there are no fields or anything. If you look at the disassembled bytecode, all it does is call the func() method. When running in Spark, this is the instance that will get serialized; since it has no fields, there's not much to be serialized.

As for your question, how to initialize static objects, you can have an idempotent initialization function that you call at the start of your closures. The first one will trigger initialization, the subsequent calls will be no-ops. Cleanup, though, is a lot trickier, since I'm not familiar with an API that does something like "run this code on all executors".

One approach that can be useful if you need cleanup is explained in this blog, in the "setup() and cleanup()" section.

EDIT: just for clarification, here's the disassembly of the method that actually makes the call.

public int apply$mcII$sp(int);
  Code:
   0:   getstatic       #29; //Field foo$.MODULE$:Lfoo$;
   3:   iload_1
   4:   invokevirtual   #32; //Method foo$.func:(I)I
   7:   ireturn

See how it just references the static field holding the singleton and calls the func() method.

这篇关于什么是对所有工人静态对象的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆