如何使用 Spark 获取来自 Kafka 的使用数据 [英] How to get use data that coming from Kafka using Spark
本文介绍了如何使用 Spark 获取来自 Kafka 的使用数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我写了下面的代码来使用 Spark Job 消费数据,
I wrote the below code to consume data using Spark Job,
获取后流式传输 kafka 或处理数据是否缺少任何内容?如何测试数据是否被检索?
Is there anything missing for streaming kafka or processing data after got retrieve? How can I test Data is retrieved or not?
// StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
;
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("Ptopic", 1);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
topicMap);
/*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
public Void call(JavaRDD<String, String> accessLogs) {
return null;
}}
);*/
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
lines.print();
jssc.start();
jssc.awaitTermination();
这里的结果只是打印..
Here result is just printing..
推荐答案
你可以使用基本的大数据功能,如map、reduce、flatmap等.
You could use the basic big data function like map, reduce, flatmap and so on.
更新 1:
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
// TODO: make some transformation here:
lines = lines.map(x -> { // clean data
String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations
x.setCallType(callType);
return x;
}).filter(pair -> { // filter data
return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters
});
lines.print();
jssc.start();
jssc.awaitTermination();
完整示例在这篇博客中进行了描述发帖
这篇关于如何使用 Spark 获取来自 Kafka 的使用数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文