星火封参数绑定 [英] Spark closure argument binding

查看:123
本文介绍了星火封参数绑定的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在斯卡拉的Apache星火工作。

试图操纵1 RDD从第二RDD数据时,我有一个问题。我试图通过第二RDD作为参数传递给一个函数被映射对首RDD,但似乎对这个函数创建的闭包绑定该值未初始化的版本。

下面是一个简单的一块code,能够显示的问题我看到的类型。 (我的真实例子,我第一次遇到了麻烦更大,更难理解)。

我真的不明白的说法约束力的规则星火关闭。

我真正寻找的是如何使用的另一个内容(这是其他地方pviously构建$ P $)操纵1 RDD一个基本的方法或模式。

在以下code,调用Test1.process(SC)将失败,并在findSquare一个空指针访问(如关闭未初始化势必第二ARG)

 对象的Test1 {  DEF过程(SC:SparkContext){
    VAL squaresMap =(1〜10).MAP(N =>(N,N- * n))的
    VAL squaresRDD = sc.parallelize(squaresMap)    VAL素数= sc.parallelize(列表(2,3,5,7))    为(P&下; - 质数){
      的println(%d个:%D.format(P,findSquare(P,squaresRDD)))
    }
  }  高清findSquare(N:智力,squaresRDD:RDD [(INT,INT)]):INT = {
    squaresRDD.filter(KV => kv._1 == N).first._1
  }
}


解决方案

你的问题的经验无关,与闭合或RDDS其中,流行的看法相反,的序列化

这只不过是打破其中指出,你不能触发来自另一个动作或转换*和这个问题的不同变体的动作或转换的一个基本规则星火已要求对SO多次。

要理解为什么这是你要想想该架构的情况:


  • SparkContext 对驾驶员进行管理

  • 这种情况发生内部变革一切都在工人执行。每个工人只给自己的部分数据访问,不与其他工人交流**。

如果你想使用你必须使用相结合RDDS,如加入笛卡尔拉链联盟

在这里,您最有可能的(我不知道为什么你传递的元组,并使用此元组的只第一个元素)希望用广播变量:

  VAL squaresMapBD = sc.broadcast(squaresMap)高清findSquare(N:智力):序号[(INT,INT)] = {
  squaresMapBD.value
    .filter {情况下(K,V)=>满足K == N}
    .MAP {情况下(K,V)=> (N,K)}
    。取(1)
}primes.flatMap(findSquare)

或直角坐标:

 素数
  .cartesian(squaresRDD)
  .filter {情况下(N,(K,_))=> n ==可ķ} {.MAP的情况下(N,(K,_))=> (N,K)}

转换素数来假人对(智力,空​​)加入会更有效:

  primes.map((_,NULL))。加入(squaresRDD).MAP(...)

但基于您的意见我假设你感兴趣的场景时,有天然的连接条件。

根据上下文,你也可以考虑使用数据库或文件来存储常用数据。

在一个侧面说明RDDS没有可迭代的,所以你不能简单地使用循环。为了能够做这样的事情,你必须收集或转换 toLocalIterator 第一。您还可以使用的foreach 方法。


*要precise您无法​​访问 SparkContext

**洪流广播和树聚集涉及执行者之间的沟通,因此在技术上是可行的。

I am working with Apache Spark in Scala.

I have a problem when trying to manipulate one RDD with data from a second RDD. I am trying to pass the 2nd RDD as an argument to a function being 'mapped' against the first RDD, but seemingly the closure created on that function binds an uninitialized version of that value.

Following is a simpler piece of code that shows the type of problem I'm seeing. (My real example where I first had trouble is larger and less understandable).

I don't really understand the argument binding rules for Spark closures.

What I'm really looking for is a basic approach or pattern for how to manipulate one RDD using the content of another (which was previously constructed elsewhere).

In the following code, calling Test1.process(sc) will fail with a null pointer access in findSquare (as the 2nd arg bound in the closure is not initialized)

object Test1 {

  def process(sc: SparkContext) {
    val squaresMap = (1 to 10).map(n => (n, n * n))
    val squaresRDD = sc.parallelize(squaresMap)

    val primes = sc.parallelize(List(2, 3, 5, 7))

    for (p <- primes) {
      println("%d: %d".format(p, findSquare(p, squaresRDD)))
    }
  }

  def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
    squaresRDD.filter(kv => kv._1 == n).first._1
  }
}

解决方案

Problem you experience has nothing to do with closures or RDDs which, contrary to popular belief, are serializable.

It is simply breaks a fundamental Spark rule which states that you cannot trigger an action or transformation from another action or transformation* and different variants of this question have been asked on SO multiple times.

To understand why that's the case you have to think about the architecture:

  • SparkContext is managed on the driver
  • everything that happens inside transformations is executed on the workers. Each worker have access only to its own part of the data and don't communicate with other workers**.

If you want to use content of multiple RDDs you have to use one of the transformations which combine RDDs, like join, cartesian, zip or union.

Here you most likely (I am not sure why you pass tuple and use only first element of this tuple) want to either use a broadcast variable:

val squaresMapBD = sc.broadcast(squaresMap)

def findSquare(n: Int): Seq[(Int, Int)] = {
  squaresMapBD.value
    .filter{case (k, v) => k == n}
    .map{case (k, v) => (n, k)}
    .take(1)
}

primes.flatMap(findSquare)

or Cartesian:

primes
  .cartesian(squaresRDD)
  .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}

Converting primes to dummy pairs (Int, null) and join would be more efficient:

primes.map((_, null)).join(squaresRDD).map(...)

but based on your comments I assume you're interested in a scenario when there is natural join condition.

Depending on a context you can also consider using database or files to store common data.

On a side note RDDs are not iterable so you cannot simply use for loop. To be able to do something like this you have to collect or convert toLocalIterator first. You can also use foreach method.


* To be precise you cannot access SparkContext.

** Torrent broadcast and tree aggregates involve communication between executors so it is technically possible.

这篇关于星火封参数绑定的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆