为什么工作节点看不到另一个工作节点上累加器的更新? [英] Why does worker node not see updates to accumulator on another worker nodes?

查看:85
本文介绍了为什么工作节点看不到另一个工作节点上累加器的更新?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在地图操作中使用LongAccumulator作为共享计数器.但是似乎我没有正确使用它,因为工作节点上计数器的状态没有更新.这是我的计数器类的样子:

I'm using a LongAccumulator as a shared counter in map operations. But it seems that I'm not using it correctly because the state of the counter on the worker nodes is not updated. Here's what my counter class looks like:

public class Counter implements Serializable {

   private LongAccumulator counter;

   public Long increment() {
      log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
      counter.add(1);
      Long value = counter.value();
      log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
      return value;
   }

   public Counter(JavaSparkContext javaSparkContext) {
      counter = javaSparkContext.sc().longAccumulator();
   }
}

据我了解的文档,当应用程序在多个工作程序节点中运行时,这应该可以正常工作:

As far as I can understand the documentation this should work fine when the application is run within multiple worker nodes:

累加器是仅通过关联和交换操作累加"的变量,因此可以有效地并行支持.它们可用于实现计数器(如在MapReduce中)或总和. Spark本机支持数字类型的累加器,程序员可以添加对新类型的支持.

Accumulators are variables that are only "added" to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

但是这是当计数器在2个不同的工作线程上递增并且看起来状态在节点之间不共享时的结果:

But here is the result when the counter is incremented on 2 different workers and as it looks like the state is not shared between the nodes:

INFO计数器:在线程上执行ID为866的递增计数器:执行程序任务启动worker-6 INFO计数器:ID为866的计数器的值为:1在线程上:执行程序任务启动worker-6
INFO计数器:在线程上执行ID为866的递增计数器:执行程序任务启动worker-0 INFO计数器:ID为866的计数器的值为:线程1:执行程序任务启动worker-0

INFO Counter: Incrementing counter with id: 866 on thread: Executor task launch worker-6 INFO Counter: Counter's value with id: 866 is: 1 on thread: Executor task launch worker-6
INFO Counter: Incrementing counter with id: 866 on thread: Executor task launch worker-0 INFO Counter: Counter's value with id: 866 is: 1 on thread: Executor task launch worker-0

我是否理解累加器概念错误,或者必须使用任何设置来启动任务?

Do I understand the accumulators conception wrong or is there any setting that I must start the task with?

推荐答案

然后,可以使用add方法将在集群上运行的任务添加到集群中.但是,他们无法读取其值.只有驱动程序才能使用累加器的value方法读取累加器的值.

Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

每个任务都有自己的累加器,该累加器会在本地更新,并在完成任务并报告结果后与驱动程序上的共享"副本合并.

Each task has its own accumulator, which is updated locally, and merged with "shared" copy on the driver, once task has finished and result has been reported.

在任务中使用value时,旧的Accumulator API(现在包装为AccumulatorV2)实际上引发了异常,但由于某些原因,它已在AccumulatorV2中省略.

The old Accumulator API (now wrapping AccumulatorV2) actually thrown an exception when using value from within a task, but for some reason it has been omitted in AccumulatorV2.

您实际遇到的情况实际上与此处所述的旧行为类似.如何从任务中打印累加器变量(似乎是工作").无需调用值方法)?

What you experience is actually similar to the old behavior described here How to print accumulator variable from within task (seem to "work" without calling value method)?

这篇关于为什么工作节点看不到另一个工作节点上累加器的更新?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆