如何实现在星火自定义作业监听器/跟踪器? [英] How to implement custom job listener/tracker in Spark?

查看:242
本文介绍了如何实现在星火自定义作业监听器/跟踪器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类像下面,当我通过命令行运行此我希望看到进展情况。有些东西一样,

  10%完成...
30%完成...
完成100%......工作完成了!

我使用的纱线火花1.0和使用Java API。

 公共类MyJavaWordCount {
    公共静态无效的主要(字串[] args)抛出异常{
        如果(args.length 2){
            通信System.err.println(用法:MyJavaWordCount<主><文件>);
            System.exit(1);
        }
        的System.out.println(ARGS [0]:其中;主&GT =+ ARGS [0]);
        的System.out.println(ARGS [1]:其中;文件> =+ ARGS [1]);        JavaSparkContext CTX =新JavaSparkContext(
                ARGS [0],
                MyJavaWordCount
                System.getenv(SPARK_HOME),
                System.getenv(SPARK_EXAMPLES_JAR));
        JavaRDD<串GT;线= ctx.textFile(参数[1],1);//输出输入输出
        JavaRDD<串GT;字= lines.flatMap(新FlatMapFunction<字符串,字符串>(){
            //输出输入
            公众可迭代<串GT;调用(String s)将{
                返回Arrays.asList(s.split());
            }
        });//ķV输入ķV
        JavaPairRDD<字符串,整数>那些= words.mapToPair(新PairFunction<字符串,字符串,整数>(){
            //ķV输入
            公共Tuple2<字符串,整数>调用(String s)将{
                //ķV
                返回新Tuple2<字符串,整数>(S,1);
            }
        });        JavaPairRDD<字符串,整数>数= ones.reduceByKey(新功能2<整数,整数,整数GT;(){
            公共电话整数(整数I1,I2整数){
                返回I1 + I2;
            }
        });        清单< Tuple2<字符串,整数>>输出= counts.collect();
        对于(Tuple2元组输出){
            的System.out.println(tuple._1 +:+ tuple._2);
        }
        System.exit(0);
    }
}


解决方案

如果您正在使用Scala火花所以尝试这一个。

下面是你SparkContext

  VAL SC =新SparkContext(sparkConf)

现在您可以在火花背景添加您的火花监听器

  sc.addSparkListener(新SparkListener(){
  覆盖高清onApplicationStart(applicationStart:SparkListenerApplicationStart){
    的println(星火ApplicationStart:+ applicationStart.appName);
  }  覆盖高清onApplicationEnd(applicationEnd:SparkListenerApplicationEnd){
    的println(星火ApplicationEnd:+ applicationEnd.time);
  }});

<一个href=\"https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.scheduler.SparkListener\">Here是的接口列表从星火时间表监听事件。

I have a class like below, and when i run this through command line i want to see progress status. some thing like,

10% completed... 
30% completed... 
100% completed...Job done!

I am using spark 1.0 on yarn and using Java API.

public class MyJavaWordCount {
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: MyJavaWordCount <master> <file>");
            System.exit(1);
        }
        System.out.println("args[0]: <master>="+args[0]);
        System.out.println("args[1]: <file>="+args[1]);

        JavaSparkContext ctx = new JavaSparkContext(
                args[0],
                "MyJavaWordCount",
                System.getenv("SPARK_HOME"),
                System.getenv("SPARK_EXAMPLES_JAR"));
        JavaRDD<String> lines = ctx.textFile(args[1], 1);

//      output                                            input   output         
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            //              output       input 
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
            }
        });

//          K       V                                                input   K       V 
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            //            K       V             input 
            public Tuple2<String, Integer> call(String s) {
                //                K       V 
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2 tuple : output) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
        System.exit(0);
    }
}

解决方案

If you are using scala-spark so try this one.

Here is your SparkContext

val sc=new SparkContext(sparkConf) 

Now you can add your spark listener in spark context

sc.addSparkListener(new SparkListener() {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
    println("Spark ApplicationStart: " + applicationStart.appName);
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
    println("Spark ApplicationEnd: " + applicationEnd.time);
  }

});

Here is the list of Interface for listening to events from the Spark schedule.

这篇关于如何实现在星火自定义作业监听器/跟踪器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆