如何从任务中打印累加器变量(似乎"工作"不调用值法)? [英] How to print accumulator variable from within task (seem to "work" without calling value method)?

查看:195
本文介绍了如何从任务中打印累加器变量(似乎"工作"不调用值法)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道蓄变量但从任务点只写,当他们在工作节点执行。我在做这方面的一些测试,我意识到,我能够在任务打印的累加器值。

在这里,我初始化蓄电池驱动程序: -

 斯卡拉> VAL ACCUM = sc.accumulator(123)
ACCUM:org.apache.spark.Accumulator [INT] = 123

然后我去定义一个函数'富': -

 斯卡拉>高清美孚(对:(字符串,字符串))= {的println(ACCUM);对}
富:(对:(字符串,字符串))(字符串,字符串)

在此功能,我只需打印蓄能器,然后我返回收到同一对。

现在我有一个名为myrdd有以下类型的RDD: -

 斯卡拉> myrdd
res13:org.apache.spark.rdd.RDD [(字符串,字符串)] = MapPartitionsRDD [9]在图在与下;控制台>:21

和我现在调用地图改造这个RDD: -

  myrdd.map(富).collect

的'收集操作被施加到力的评价。所以实际发生在这里的是,这在执行期间一个零(0)被打印的RDD的每一行。由于这RDD有4个元素,它打印0 4倍。由于行动收集是存在的,它也打印在最后所有的元素,但在这里,这不是真正的焦点。所以,我有两个问题: -


  1. 从逻辑上讲,相当于阅读印刷,因为只有当你可以阅读,可以打印。那么,为什么这可以吗?为什么异常没有抛出的东西,如果我们试图回归中的功能蓄能器)?这肯定会发生

  2. 为什么打印0作为累加器的值,当我们已经在驱动程序启动,那么,123?

一些实验,我发现,如果我改变函数定义访问累加器对象(accum.value)的实际值属性,然后触发已经描述了RDD行动,它确实抛出异常后: -

 斯卡拉>高清美孚(对:(字符串,字符串))= {的println(accum.value);对}

在RDD评价中产生的异常: -


  

不能在任务读取累加器值


所以,我前面做的是要打印的累加器对象本身。但问题仍然存在,为什么它印0?因为在驱动程序级别,如果我发布,我在函数定义中使用的相同的命令,我确实得到价值123: -

 斯卡拉>的println(ACCUM)
123

我没得说的println(accum.value)为它工作。那么,为什么只有当我在发出该任务使用命令功能,它打印0?


解决方案

  

为什么打印0作为累加器的值,当我们已经在驱动程序启动,那么,123?


由于工作节点将再也看不到初始值。只有传递给员工的是,如 AccumulatorParam 定义。对于累加器[INT] 那简直是0。如果你第一次更新,你会看到更新累加器本地值:

  VAL ACC = sc.accumulator(123)
VAL RDD = sc.parallelize(列表(1,2,3))
rdd.foreach(I => {ACC + =我;的println(ACC)})

当你使用一个分区是更清楚:

  rdd.repartition(1).foreach(I => {ACC + =我;的println(ACC)}


  

为什么异常没有抛出(...)?


由于异常被抛出当您访问<一个href=\"https://github.com/apache/spark/blob/812b63bbee8d0b30884f7a96b207e8834b774957/core/src/main/scala/org/apache/spark/Accumulators.scala#L113\"相对=nofollow> 法,<一个href=\"https://github.com/apache/spark/blob/812b63bbee8d0b30884f7a96b207e8834b774957/core/src/main/scala/org/apache/spark/Accumulators.scala#L159\"相对=nofollow> 的toString <不使用它在所有/ code>。相反,它使用专用价值_ 变量,它是由返回相同的一个值如果!反序列化检查通过。

I know the accumulator variables are 'write only' from the point of view of tasks, when they are in execution in worker nodes. I was doing some testing on this and I realized that I am able to print the accumulator value in the task.

Here I am initializing the accumulator in the driver:-

scala> val accum  = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123

Then I go on to define a function 'foo':-

scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)

In this function I am simply printing the accumulator and then I return the same pair that was received.

Now I have an RDD called myrdd with the following type:-

scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21

And I am now calling the map transformation on this RDD:-

myrdd.map(foo).collect

The 'collect' action is being applied to force evaluation. So what actually happens here is that during this execution a zero (0) is printed for every line of the RDD. Since this RDD has 4 elements, it prints 0 4 times. Since the action 'collect' is there , it also prints all the elements in the end, but that's not really the focus here. So I have two questions:-

  1. Logically, printing equivalent to reading, because only when you can read, can you print. So why is this allowed? Why was the exception not thrown something that would definitely happen if we try to 'return' the accumulator in the function)?
  2. Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

After some experimentation I found that if I change function definition to access the actual value property of the accumulator object (accum.value), and then trigger the RDD action as described already, it does indeed throw the exception:-

scala> def foo(pair:(String,String)) = { println(accum.value); pair }

The exception caused during the RDD evaluation:-

Can't read accumulator value in the task

So what I was doing earlier is trying to print the accumulator object itself. But the question still remains as to why it printed 0? Because at driver level if I issue the same command that I used in the function definition, I do indeed get the value 123:-

scala> println(accum)
123

I didn't have to say println(accum.value) for it to work. So why only, when I issue this command in the function which the task uses, does it print 0?

解决方案

Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

Because worker nodes will never see initial value. Only thing that is passed to workers is zero, as defined in AccumulatorParam. For Accumulator[Int] it is simply 0. If you first update an accumulator you'll see updated local value:

val acc = sc.accumulator(123)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.foreach(i => {acc += i; println(acc)})

It is even clearer when you use a single partition:

rdd.repartition(1).foreach(i => {acc += i; println(acc)}

Why was the exception not thrown (...)?

Because exception is thrown when you access value method, and toString is not using it at all. Instead it is using private value_ variable, the same one which is returned by value if !deserialized check passed.

这篇关于如何从任务中打印累加器变量(似乎&QUOT;工作&QUOT;不调用值法)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆