如何使用Spark获取来自Kafka的数据 [英] How to get use data that coming from Kafka using Spark
本文介绍了如何使用Spark获取来自Kafka的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我编写了下面的代码来使用Spark Job消费数据,
I wrote the below code to consume data using Spark Job,
在获取检索后流式传输kafka或处理数据是否有什么缺失?我如何测试是否检索数据?
Is there anything missing for streaming kafka or processing data after got retrieve? How can I test Data is retrieved or not?
// StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]");
;
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("Ptopic", 1);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "localhost:2181", "5",
topicMap);
/*messages.foreach(new Function<JavaRDD<String, String>, Void>() {
public Void call(JavaRDD<String, String> accessLogs) {
return null;
}}
);*/
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
lines.print();
jssc.start();
jssc.awaitTermination();
这里的结果就是打印..
Here result is just printing..
推荐答案
您可以使用map,reduce,flatmap等基本的大数据功能。
You could use the basic big data function like map, reduce, flatmap and so on.
更新1:
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
/*System.out.println(tuple2._1().toString());
System.out.println(tuple2._2().toString());*/
return tuple2._2();
}
});
// TODO: make some transformation here:
lines = lines.map(x -> { // clean data
String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); // here some operations
x.setCallType(callType);
return x;
}).filter(pair -> { // filter data
return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); // here so filters
});
lines.print();
jssc.start();
jssc.awaitTermination();
这篇关于如何使用Spark获取来自Kafka的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文