如何使用Spark获取来自Kafka的数据 [英] How to get use data that coming from Kafka using Spark

查看:148
本文介绍了如何使用Spark获取来自Kafka的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了下面的代码来使用Spark Job消费数据,

I wrote the below code to consume data using Spark Job,

在获取检索后流式传输kafka或处理数据是否有什么缺失?我如何测试是否检索数据?

Is there anything missing for streaming kafka or processing data after got retrieve? How can I test Data is retrieved or not?

// StreamingExamples.setStreamingLogLevels();
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
        ;
        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));

        Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put("Ptopic", 1);

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
                topicMap);

        /*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
            public Void call(JavaRDD<String, String> accessLogs) {
                  return null;
                }}
              );*/

        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                /*System.out.println(tuple2._1().toString());
                System.out.println(tuple2._2().toString());*/
                return tuple2._2();
            }
        });

        lines.print();
        jssc.start();
        jssc.awaitTermination();

这里的结果就是打印..

Here result is just printing..

推荐答案

您可以使用map,reduce,flatmap等基本的大数据功能。

You could use the basic big data function like map, reduce, flatmap and so on.

更新1:

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            /*System.out.println(tuple2._1().toString());
            System.out.println(tuple2._2().toString());*/
            return tuple2._2();
        }
    });

    // TODO: make some transformation here:
    lines = lines.map(x -> { // clean data
        String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations
        x.setCallType(callType);
        return x;
    }).filter(pair -> { // filter data
        return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters
    });

    lines.print();
    jssc.start();
    jssc.awaitTermination(); 

此博客文章

这篇关于如何使用Spark获取来自Kafka的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆