Spark Java累加器未递增 [英] Spark Java Accumulator not incrementing

查看:72
本文介绍了Spark Java累加器未递增的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

只是从Spark-Java的第一步开始.下面是一个单词计数程序,其中包括一个停用词列表,该列表将跳过列表中的单词.我有2个累加器来计算跳过的单词和未跳过的单词.

Just started with baby steps in Spark-Java. Below is a word count program that includes a stop word list that would skip words that are in the list. I have 2 accumulators to count the skipped words and unskipped words.

但是,程序末尾的Sysout总是将两个累加器的值都设为0 .

However, the Sysout at the end of program always gives both accumulator values to be 0.

请指出我要去哪里了.

public static void main(String[] args) throws FileNotFoundException {

        SparkConf conf = new SparkConf();
        conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> fileRDD = jsc.textFile("hello.txt");
        JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {

            public Iterable<String> call(String aLine) throws Exception {
                return Arrays.asList(aLine.split(" "));
            }
        });

        String[] stopWordArray = getStopWordArray();

         final Accumulator<Integer> skipAccumulator = jsc.accumulator(0);
         final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0);

        final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray);

        JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() {

            public Boolean call(String inString) throws Exception {
                boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString);
                if(!filterCondition){
                    System.out.println("Filtered a stop word ");
                    skipAccumulator.add(1);
                }else{
                    unSkipAccumulator.add(1);
                }
                return filterCondition;

            }
        });

        System.out.println("$$$$$$$$$$$$$$$Filtered Count "+skipAccumulator.value());
        System.out.println("$$$$$$$$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value());

        /* rest of code - works fine */
        jsc.stop();
        jsc.close();
        }

我正在制作一个可运行的jar,并使用

I am making a runnable jar and submit the job on Hortonworks Sandbox 2.4 using

spark-submit jarname

------------ EDIT ----------------

带注释的部分中的代码的REST

REST of the code that goes in the commented portion

JavaPairRDD<String, Integer> wordOccurrence = filteredWords.mapToPair(new PairFunction<String, String, Integer>() {

            public Tuple2<String, Integer> call(String inWord) throws Exception {
                return new Tuple2<String, Integer>(inWord, 1);
            }
        });

        JavaPairRDD<String, Integer> summed = wordOccurrence.reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer a, Integer b) throws Exception {
                return a+b;
            }
        });

        summed.saveAsTextFile("hello-out");

推荐答案

您错过了发布重要部分/* rest of code - works fine */的机会.我可以保证在其余的代码中您正在调用某些操作.这会触发DAG使用累加器执行代码.尝试在println之前添加filteredWords.collect,您应该会看到输出.请记住,Spark懒于转换,只对动作执行.

You missed posting the important part /* rest of code - works fine */. I can just about guarantee that you are calling some action in that rest of code. Which triggers the DAG to execute the code with the accumulator. Try adding a filteredWords.collect before the println and you should see the output. Remember that Spark is lazy on transformations, and only executes on actions.

这篇关于Spark Java累加器未递增的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆