Spark Streaming过滤流数据 [英] Spark Streaming Filtering the Streaming data
问题描述
我正在尝试过滤流数据,并且基于id列的值,我想将数据保存到不同的表中
I am trying to filter the Streaming Data, and based on the value of the id column i want to save the data to different tables
我有两个表
- testTable_odd(id,data1,data2)
- testTable_even(id,data1)
如果id值是奇数,那么我想将记录保存到testTable_odd表中,如果值是偶数,那么我想将记录保存到testTable_evend中.
if the id value is odd then i want to save record to testTable_odd table and if the value is even then i want to save record to testTable_even.
这里最棘手的部分是我的两个表具有不同的列.尝试了多种方式,考虑了返回类型为Either [obj1,obj2]的Scala函数,但是我无法成功,任何指针将不胜感激.
the tricky part here is my two tables has different columns. tried multiple ways, considered Scala functions with return type Either[obj1,obj2], but i wasn't able to succeed, any pointers would be greatly appreciated.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data1: String, data2: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
推荐答案
我已经执行了以下步骤.1)从原始JSON字符串和case类中提取详细信息2)创建了超级JSON(具有两个过滤条件都需要的详细信息)3)将JSON转换为DataFrame4)在该JSON上执行select和where子句5)保存到Cassandra
I have performed below steps . 1) extracted the details from raw JSON String and with case class 2) created the super JSON (which has details required for both the filter criteria) 3) converted that JSON into DataFrame 4) performed the select and where clause on that JSON 5) save to Cassandra
这篇关于Spark Streaming过滤流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!