任务未读取Spark累加器值 [英] Spark Accumulator value not read by task
问题描述
我正在初始化累加器
final Accumulator<Integer> accum = sc.accumulator(0);
然后在使用map函数时,我试图增加累加器,然后在设置变量时使用累加器值.
And then while in map function , I'm trying to increment the accumulator , then using the accumulator value in setting a variable.
JavaRDD<UserSetGet> UserProfileRDD1 = temp.map(new Function<String, UserSetGet>() {
@Override
public UserSetGet call(String arg0) throws Exception {
UserSetGet usg = new UserSetGet();
accum.add(1);
usg.setPid(accum.value().toString();
}
});
但是我收到以下错误.
16/03/14 09:12:58错误执行程序.执行程序:任务0.0中的异常 stage 2.0(TID 2)java.lang.UnsupportedOperationException:无法读取 任务中的累加器值
16/03/14 09:12:58 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.UnsupportedOperationException: Can't read accumulator value in task
已编辑-根据Avihoo Mamka的回答,不可能在任务中获得累加器值.
EDITED - As per the answer from Avihoo Mamka, getting accumulator value in tasks is not possible.
所以无论如何我都可以并行实现相同的目标.这样,每次在我的地图函数中增加一个变量(例如静态变量)时,便会设置Pid值.
So is there anyway I can achieve the same in parallel. Such that the Pid value gets set each time a variable(eg like static variable) is incremented in my map function.
推荐答案
来自 Spark文档
累加器是仅通过累加"到变量的变量. 关联操作,因此可以在 平行线.它们可用于实现计数器(如在MapReduce中)或 总和
Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums
...
只有驱动程序程序可以使用该程序读取累加器的值
value
方法.
Only the driver program can read the accumulator’s value, using its
value
method.
因此,当尝试从Spark中的任务中读取累加器的值时,意味着您尝试从工作程序中读取累加器的值,这与仅从驱动程序读取累加器值的概念相反.
Therefore, when trying to read the accumulator's value from within a task in Spark, means that you try to read its value from a worker, which is against the concept of reading the accumulator value only from the driver.
这篇关于任务未读取Spark累加器值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!