Spark Java累加器未递增 [英] Spark Java Accumulator not incrementing
问题描述
只是从Spark-Java的第一步开始.下面是一个单词计数程序,其中包括一个停用词列表,该列表将跳过列表中的单词.我有2个累加器来计算跳过的单词和未跳过的单词.
Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.
但是,程序末尾的Sysout
总是将两个累加器的值都设为0 .
However, the Sysout
at the end of program always gives both accumulator values to be 0.
请指出我要去哪里了.
public static void main(String[] args) throws FileNotFoundException {
SparkConf conf = new SparkConf();
conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = jsc.textFile("hello.txt");
JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String aLine) throws Exception {
return Arrays.asList(aLine.split(" "));
}
});
String[] stopWordArray = getStopWordArray();
final Accumulator<Integer> skipAccumulator = jsc.accumulator(0);
final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0);
final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray);
JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() {
public Boolean call(String inString) throws Exception {
boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString);
if(!filterCondition){
System.out.println("Filtered a stop word ");
skipAccumulator.add(1);
}else{
unSkipAccumulator.add(1);
}
return filterCondition;
}
});
System.out.println("$$$$$$$$$$$$$$$Filtered Count "+skipAccumulator.value());
System.out.println("$$$$$$$$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value());
/* rest of code - works fine */
jsc.stop();
jsc.close();
}
我正在制作一个可运行的jar,并使用
I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using
spark-submit jarname
------------ EDIT ----------------
带注释的部分中的代码的REST
REST of the code that goes in the commented portion
JavaPairRDD<String, Integer> wordOccurrence = filteredWords.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String inWord) throws Exception {
return new Tuple2<String, Integer>(inWord, 1);
}
});
JavaPairRDD<String, Integer> summed = wordOccurrence.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});
summed.saveAsTextFile("hello-out");
推荐答案
您错过了发布重要部分/* rest of code - works fine */
的机会.我可以保证在其余的代码中您正在调用某些操作.这会触发DAG使用累加器执行代码.尝试在println之前添加filteredWords.collect
,您应该会看到输出.请记住,Spark懒于转换,只对动作执行.
You missed posting the important part /* rest of code - works fine */
. I can just about guarantee that you are calling some action in that rest of code. Which triggers the DAG to execute the code with the accumulator. Try adding a filteredWords.collect
before the println and you should see the output. Remember that Spark is lazy on transformations, and only executes on actions.
这篇关于Spark Java累加器未递增的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!