Spark Streaming过滤流数据 [英] Spark Streaming Filtering the Streaming data

查看:77
本文介绍了Spark Streaming过滤流数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试过滤流数据,并且基于id列的值,我想将数据保存到不同的表中

I am trying to filter the Streaming Data, and based on the value of the id column i want to save the data to different tables

我有两个表

  1. testTable_odd(id,data1,data2)
  2. testTable_even(id,data1)

如果id值是奇数,那么我想将记录保存到testTable_odd表中,如果值是偶数,那么我想将记录保存到testTable_evend中.

if the id value is odd then i want to save record to testTable_odd table and if the value is even then i want to save record to testTable_even.

这里最棘手的部分是我的两个表具有不同的列.尝试了多种方式,考虑了返回类型为Either [obj1,obj2]的Scala函数,但是我无法成功,任何指针将不胜感激.

the tricky part here is my two tables has different columns. tried multiple ways, considered Scala functions with return type Either[obj1,obj2], but i wasn't able to succeed, any pointers would be greatly appreciated.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime

object StreamProcessor extends Serializable {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc, Seconds(2))

    val sqlContext = new SQLContext(sc)

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")

    val topics = args.toSet

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)


        stream
  .map { 
    case (_, msg) => 
      val result = msgParseMaster(msg)
      (result.id, result.data)
   }.foreachRDD(rdd => if (!rdd.isEmpty)     rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))

      }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  import org.json4s._
  import org.json4s.native.JsonMethods._
  case class wordCount(id: Long, data1: String, data2: String) extends serializable
  implicit val formats = DefaultFormats
  def msgParseMaster(msg: String): wordCount = {
    val m = parse(msg).extract[wordCount]
    return m

  }

}

推荐答案

我已经执行了以下步骤.1)从原始JSON字符串和case类中提取详细信息2)创建了超级JSON(具有两个过滤条件都需要的详细信息)3)将JSON转换为DataFrame4)在该JSON上执行select和where子句5)保存到Cassandra

I have performed below steps . 1) extracted the details from raw JSON String and with case class 2) created the super JSON (which has details required for both the filter criteria) 3) converted that JSON into DataFrame 4) performed the select and where clause on that JSON 5) save to Cassandra

这篇关于Spark Streaming过滤流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆