为什么我不能在集群模式下更新数组但是可以在伪分布式模式下更新 [英] Why I cannot update an array in cluster mode but could in pseudo-distributed

查看:76
本文介绍了为什么我不能在集群模式下更新数组但是可以在伪分布式模式下更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在scala中编写了一个spark程序,其主要代码为:

I wrote a spark program in scala, of which the main codes are:

val centers:Array[(Vector,Double)] = initCenters(k)
val sumsMap:Map(int,(vector,int))= data.mapPartitions{
    ***
}.reduceByKey(***).collectAsMap()
sumsMap.foreach{case(index,(sum,count))=>
   sum/=count
   centers(index)=(sum,sum.norm2())
}

原始代码为:

 val centers = initCenters.getOrElse(initCenter(data))

val br_centers = data.sparkContext.broadcast(centers)
val trainData = data.map(e => (e._2, e._2.norm2)).cache()
val squareStopBound = stopBound * stopBound
var isConvergence = false
var i = 0
val costs = data.sparkContext.doubleAccumulator

while (!isConvergence && i < maxIters) {
  costs.reset()
  val res = trainData.mapPartitions { iter =>
    val counts = new Array[Int](k)
    util.Arrays.fill(counts, 0)
    val partSum = (0 until k).map(e => new DenseVector(br_centers.value(0)._1.size))

    iter.foreach { e =>
      val (index, cost) = KMeans.findNearest(e, br_centers.value)
      costs.add(cost)
      counts(index) += 1
      partSum(index) += e._1
    }
    counts.indices.filter(j => counts(j) > 0).map(j => (j -> (partSum(j), counts(j)))).iterator
  }.reduceByKey { case ((s1, c1), (s2, c2)) =>
    (s1 += s2, c1 + c2)
  }.collectAsMap()
  br_centers.unpersist(false)


  println(s"cost at iter: $i is: ${costs.value}")
  isConvergence = true
  res.foreach { case (index, (sum, count)) =>
    sum /= count
    val sumNorm2 = sum.norm2()
    val squareDist = math.pow(centers(index)._2, 2.0) + math.pow(sumNorm2, 2.0) - 2 * (centers(index)._1 * sum)
    if (squareDist >= squareStopBound) {
      isConvergence = false
    }
    centers.update(index,(sum, sumNorm2))
  }
  i += 1
}

当它们在IDEA中以伪分布式模式运行时,我会更新中心,而当它们在Spark集群中运行时,我不会更新中心.

when these run in a pseudo-distributed mode in IDEA, I get the centers updated, while when I get these run on a spark cluster, I do not get the centers updated.

推荐答案

LostInOverflow的答案是正确的,但对于发生的情况不是特别描述.

LostInOverflow's answer is correct, but not especially descriptive as to what's going on.

以下是代码的一些重要属性:

Here are some important properties of your code:

  1. 声明一个数组 centers
  2. 将此数组广播为 br_centers
  3. 迭代更新 centers

所以这怎么了?广播是静态的.如果我写:

So how is this going wrong? Well, broadcasts are static. If I write:

val a = Array(1,2,3)
val aBc = sc.broadcast(a)
a(0) = 67

并访问 aBc.value(0),根据此代码是否在驱动程序JVM上运行,我将获得不同的结果.广播接收一个对象,将其通过网络洪流到每个节点,并在每个JVM中创建一个新引用.此引用的存在与广播基础对象时一样,并且在对基础对象进行突变时不会实时更新.

and access aBc.value(0), I'm going to get different results depending on whether this code was run on the driver JVM or not. Broadcasting takes an object, torrents it across the network to each node, and creates a new reference in each JVM. This reference exists as it did when the base object was broadcasted, and it is NOT updated in real time as you mutate the base object.

有什么解决方案?我认为将广播移到 while 循环内,以便广播更新的 centers 应该可以:

What's the solution? I think moving the broadcast inside the while loop so that you broadcast the updated centers should work:

while (!isConvergence && i < maxIters) {
  val br_centers = data.sparkContext.broadcast(centers)
  ...

这篇关于为什么我不能在集群模式下更新数组但是可以在伪分布式模式下更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆