如何使用 Spark 获取来自 Kafka 的使用数据 [英] How to get use data that coming from Kafka using Spark

查看:27
本文介绍了如何使用 Spark 获取来自 Kafka 的使用数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了下面的代码来使用 Spark Job 消费数据,

I wrote the below code to consume data using Spark Job,

获取后流式传输 kafka 或处理数据是否缺少任何内容?如何测试数据是否被检索?

Is there anything missing for streaming kafka or processing data after got retrieve? How can I test Data is retrieved or not?

// StreamingExamples.setStreamingLogLevels();
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
        ;
        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));

        Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put("Ptopic", 1);

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
                topicMap);

        /*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
            public Void call(JavaRDD<String, String> accessLogs) {
                  return null;
                }}
              );*/

        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                /*System.out.println(tuple2._1().toString());
                System.out.println(tuple2._2().toString());*/
                return tuple2._2();
            }
        });

        lines.print();
        jssc.start();
        jssc.awaitTermination();

这里的结果只是打印..

Here result is just printing..

推荐答案

你可以使用基本的大数据功能,如map、reduce、flatmap等.

You could use the basic big data function like map, reduce, flatmap and so on.

更新 1:

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            /*System.out.println(tuple2._1().toString());
            System.out.println(tuple2._2().toString());*/
            return tuple2._2();
        }
    });

    // TODO: make some transformation here:
    lines = lines.map(x -> { // clean data
        String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations
        x.setCallType(callType);
        return x;
    }).filter(pair -> { // filter data
        return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters
    });

    lines.print();
    jssc.start();
    jssc.awaitTermination(); 

完整示例在这篇博客中进行了描述发帖

这篇关于如何使用 Spark 获取来自 Kafka 的使用数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆