SQL over Spark流式传输 [英] SQL over Spark Streaming

查看:90
本文介绍了SQL over Spark流式传输的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是通过Spark Streaming运行简单SQL查询的代码.

This is the code to run simple SQL queries over Spark Streaming.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object StreamingSQL {

  case class Persons(name: String, age: Int)

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

如您所见,要通过流运行SQL,必须在foreachRDD方法内进行查询. 我想对从两个不同流接收到的数据运行SQL连接.有什么办法可以做?

As you can see, to run SQL over streaming, the query has to be made inside the foreachRDD method. I want to run SQL join over data received from two different streams. Is there any way that can be done?

推荐答案

好,我想总结一下 Spiro 在回答后的讨论中我们得出的解决方法.他的建议是先创建一个空表,然后将RDD插入该表.唯一的问题是 Spark不允许插入表中.这是可以做的:

Well, I would like to sum up the workaround we arrived on after the discussion in the answer by Spiro. His suggestion to first create an empty table and then inserting RDDs into it was bang on. The only problem is that Spark doesn't allow to insert into tables yet. Here is what can be done:

首先,创建一个具有与您的流期望的架构相同的RDD:

First, create an RDD that has the same schema as the one you're expecting from your stream:

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))

然后将其另存为 Parquet File

d1.saveAsParquetFile("/home/p1.parquet")

现在,使用 registerAsTable()方法加载镶木地板文件并将其注册为表格.

Now, load the parquet file and register it as table using the registerAsTable() method.

val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")

现在,当您收到流时,只需在流上应用 foreachRDD(),并使用 insertInto()继续在上面创建的表中插入各个RDD.方法

Now, when you receive your stream, just apply a foreachRDD() on your stream and keep inserting the individual RDDs in the table created above using the insertInto() method

dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})

此insertInto()正常工作,并允许将数据收集到表中.现在,您可以对任意数量的流执行相同的操作,然后运行查询.

This insertInto() works fine and allows the data to be collected into a table. Now you can do the same for any number of streams and then run your queries.

这篇关于SQL over Spark流式传输的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆