Spark:累加器和局部变量之间的差异 [英] Spark : Difference between accumulator and local variable

查看:162
本文介绍了Spark:累加器和局部变量之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在探索Spark累加器时,我试图理解并展示Spark中的累加器和常规变量之间的区别。但是输出似乎与我的期望不符。我的意思是,累加器和计数器在程序末尾具有相同的值,并且能够在转换函数内读取累加器(根据文档,只有驱动程序才能读取累加器)。难道我做错了什么?我的理解正确吗?

  object累加器扩展了应用{{b 
$ b val spark = SparkSession.builder() .appName( Accmulators)。master( local [3])。getOrCreate()

val cntAccum = spark.sparkContext.longAccumulator( counterAccum)

val file = spark.sparkContext.textFile( resources / InputWithBlank)

var counter = 0

def countBlank(line:String):Array [String] = {
val trimped = line.trim
if(trimmed ==){
cntAccum.add(1)
cntAccum.value //读取累加器
counter + = 1
}
返回line.split()
}

file.flatMap(line => countBlank(line))。collect()

println(cntAccum.value)

println(counter)
}

输入文件的文本之间有9个空行

  4)聚合和联接

5)Spark SQL

6)Spark应用调整

输出:



计数器和cntAccum都给出相同的结果。

解决方案

计数器是本地变量,可能正在您的本地程序 .master( local [3])将在驱动程序上执行。假设您正在运行 yarn 模式。那么所有逻辑将以分布式方式工作,您的局部变量将不会被更新(因为其局部变量将被更新),但是累加器将被更新。由于其分布变量。假设您有2个执行程序正在运行该程序...一个执行程序将更新,而另一个执行程序可以看到最新值。
在这种情况下,您的 cntAccum 能够以纱线分布模式从其他执行者那里获得最新价值。其中作为局部变量 counter 不能...



因为



在图像执行程序ID中为localhost。如果您使用的纱线带有2-3个执行者,它将显示执行者ID。希望能有所帮助。


While exploring Spark accumulators, I tried to understand and showcase the difference between the accumulator and regular variable in Spark. But output does not seem to match my expectation. I mean both the accumulator and counter have the same value at the end of program and am able read accumulator within transformation function (as per docs only driver can read accumulator). Am i doing something wrong? Is my understanding correct?

object Accmulators extends App {

  val spark = SparkSession.builder().appName("Accmulators").master("local[3]").getOrCreate()

  val cntAccum = spark.sparkContext.longAccumulator("counterAccum")

  val file = spark.sparkContext.textFile("resources/InputWithBlank")

  var counter = 0

  def countBlank(line:String):Array[String]={
    val trimmed = line.trim
    if(trimmed == "") {
      cntAccum.add(1)
      cntAccum.value //reading accumulator
      counter += 1
    }
    return line.split(" ")
  }

  file.flatMap(line => countBlank(line)).collect()

  println(cntAccum.value)

  println(counter)
}

The input file has text with 9 empty lines in between

4) Aggregations and Joins

5) Spark SQL

6) Spark App tuning

Output :

Both counter and cntAccum giving same result.

解决方案

counter is local variable may be is working in your local program .master("local[3]") which will execute on driver. imagine you are running yarn mode. then all the logic will be working in a distributed way your local variable wont be updated (since its local its getting updated) but accumulator will be updated. since its distributed variable. suppose you have 2 executors running the program... one executor will update and another executor can able to see the latest value. In this case your cntAccum is capable of getting latest value from other executors in yarn distributed mode. where as local variable counter cant...

since accumulators are read and write. see docs here.

In the image exeutor id is localhost. if you are using yarn with 2-3 executors it will show executor ids. Hope that helps..

这篇关于Spark:累加器和局部变量之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆