Spark Streaming + Hive [英] Spark Streaming + Hive

查看:106
本文介绍了Spark Streaming + Hive的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在构建一个应用程序,该应用程序通过水槽从源系统中获取数据,然后借助Kafka消息系统来触发流以进行内存处理,在将数据处理到数据帧中之后,我们会将数据放入配置单元表. 流程如下 源系统-> Flume-> Kafka-> Spark Streaming-> Hive ,它是正确的流量还是我们需要对其进行审查?

We are in a process to build a application that takes data from source system through flume and then with the help of Kafka message system to spark streaming for in memory processing, after processing data into data frame we will put data into hive tables. Flow will be as follows Source System -> Flume -> Kafka -> Spark Streaming -> Hive , Is it correct flow or we need to review it?

我们正在使用离散流并将其转换为SQL兼容功能的数据帧.现在,我们在蜂巢中有14个表,必须根据代码类型加载数据.如果我们看到下面的代码,则在将Dstream分配给特定的foreachRDD之前对其进行过滤,对于14个过滤器,我们必须对14个foreachRRD主体进行14次过滤

We are taking Discrete stream and converting it into data frame for SQL compatibility functions. Now we have 14 tables in hive where we have to load data according to code type. If we see code below we are filtering our Dstream before giving it to specific foreachRDD and for 14 filters we have to filter it 14 times for separate 14 foreachRRD bodies

val fil_sms = lines.filter(_.startsWith("1|"))
    val fil_calls = lines.filter(_.startsWith("7|"))

我们可以在单个foreachRDD主体中处理此问题吗,因为我已经尝试过,但是它仅过滤一行. 如果有人在此代码中提供帮助,以使其在性能和实现方面更好,我是否在执行正确的过程. 希望你能理解我的查询

Can we handle this in single foreachRDD body because i have tried but it is filtering only one line. Am i doing right procedure if someone help in this code to make it more better for performance and implementation. Hopefully you understand my query

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FileSystem
import java.net.URI
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.types._
    object test1 {

      case class cdrsms(col1: String , col2: String , col3: String , col4: String ,
                       col5: String , col6: String , col7: String , col8: String,
                       col9: String ,  col10: String)


      case class cdrcalls(col1: String , col2: String , col3: String , col4: String ,
                        col5: String , col6: String , col7: String , col8: String,
                        col9: String ,  col10: String , col11: String ,col12 : String , col13 : String)

      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("hwtest").setMaster("local[*]")
        val topicsSet = "hwkf01".split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.2.210:9092")
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(20))
        val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
        val lines = messages.map(_._2)
        val fil_sms = lines.filter(_.startsWith("1|"))
        val fil_calls = lines.filter(_.startsWith("7|"))
       val sqlContext = new HiveContext(sc)
        import sqlContext.implicits._

        fil_sms.foreachRDD(rdd=> if(!rdd.isEmpty){
          //val sms = rdd.filter(_.startsWith("1|"))
          rdd.map(_.split('|')).map(p => cdrsms(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9))).toDF().registerTempTable("cdr_sms")
          val tbl1 = sqlContext.sql("SELECT * FROM cdr_sms")
          tbl1.foreach(println)
          sqlContext.sql("insert into table sms select * from cdr_data")
        })

        fil_calls.foreachRDD(rdd=> if(!rdd.isEmpty){
          rdd.map(_.split('|')).map(p => cdrcalls(p(0), p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12))).toDF().registerTempTable("cdr_calls")
          val tbl1 = sqlContext.sql("SELECT * FROM cdr_calls")
          tbl1.foreach(println)

          sqlContext.sql("insert into table calls select * from cdr_data")
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }

推荐答案

我正在等待专家的回答,这是因为我尝试了很多次锻炼后,并在单个foreachRDD中将代码更改为以下代码,如果可行,请指导.两种代码都可以正常工作并且具有相同的最终结果,但是我正在寻找实现相同结果的最佳实践.我删除了临时表的创建,现在直接插入到配置单元表中,这肯定会节省物理内存和处理时间.

I am waiting for expert answer mean while I have tried many workouts and I changed my code to below in single foreachRDD, please guide if is good way to go with. Both codes are working fine and have same end result but I am finding best practice to achieve same result. I have removed temporary table creation and now directly inserting to hive table that will definitely save physical memory and processing time.

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.FileSystem
import java.net.URI
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.types._

object test1 {

  case class cdrsms(col1: String , col2: String , col3: String , col4: String ,
                   col5: String , col6: String , col7: String , col8: String,
                   col9: String ,  col10: String)


  case class cdrcalls(col1: String , col2: String , col3: String , col4: String ,
                    col5: String , col6: String , col7: String , col8: String,
                    col9: String ,  col10: String , col11: String ,col12 : String , col13 : String)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("hwtest").setMaster("local[*]")
    val topicsSet = "hwkf01".split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.2.210:9092")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(20))
    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)

   val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    lines.foreachRDD(rdd=> if(!rdd.isEmpty){
      val sms = rdd.filter(_.startsWith("7|"))
      val calls = rdd.filter(_.startsWith("1|"))

      sms.map(_.split('|'))
        .map(p => cdrsms(p(0), p(1),p(2), p(3),p(4),p(5),p(6),p(7),p(8),p(9)))
        .toDF()
        .write.mode("append")
        .insertInto("sms_cdr")

      calls.map(_.split('|'))
        .map(p => cdrcalls(p(0), p(1),p(2), p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12)))
        .toDF()
        .write.mode("append")
        .insertInto("calls_cdr")

   })


    ssc.start()
    ssc.awaitTermination()
  }
}

这篇关于Spark Streaming + Hive的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆