出现错误,提示“必须使用writeStream.start()执行带有流源的查询".在火花结构化的流媒体上 [英] Getting error saying "Queries with streaming sources must be executed with writeStream.start()" on spark structured streaming

查看:76
本文介绍了出现错误,提示“必须使用writeStream.start()执行带有流源的查询".在火花结构化的流媒体上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在spark结构流之上执行spark SQL时遇到一些问题.PFA错误.

I am getting some issues while executing spark SQL on top of spark structures streaming. PFA for error.

这是我的代码

 object sparkSqlIntegration {
    def main(args: Array[String]) {
     val spark = SparkSession
         .builder
         .appName("StructuredStreaming")
         .master("local[*]")
         .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
         .config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
         .getOrCreate()

       setupLogging()
         val userSchema = new StructType().add("name", "string").add("age", "integer")
       // Create a stream of text files dumped into the logs directory
       val rawData =  spark.readStream.option("sep", ",").schema(userSchema).csv("file:///C:/Users/R/Documents/spark-poc-centri/csvFolder")

       // Must import spark.implicits for conversion to DataSet to work!
       import spark.implicits._
      rawData.createOrReplaceTempView("updates")
       val sqlResult= spark.sql("select * from updates")
       println("sql results here")
       sqlResult.show()
       println("Otheres")
       val query = rawData.writeStream.outputMode("append").format("console").start()

       // Keep going until we're stopped.
       query.awaitTermination()

       spark.stop()

    }
 }

在执行过程中,出现以下错误.我是流媒体新手,谁能告诉我如何在Spark结构化流媒体上执行Spark SQL查询

During execution, I am getting the following error. As I am new to streaming can anyone tell how can I execute spark SQL queries on spark structured streaming

2018-12-27 16:02:40 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, LAPTOP-5IHPFLOD, 6829, None)
2018-12-27 16:02:41 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6731787b{/metrics/json,null,AVAILABLE,@Spark}
sql results here
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///C:/Users/R/Documents/spark-poc-centri/csvFolder]
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:392)

推荐答案

您不需要这些行

import spark.implicits._
rawData.createOrReplaceTempView("updates")
val sqlResult= spark.sql("select * from updates")
println("sql results here")
sqlResult.show()
println("Otheres")

最重要的是,不需要 select * .当您打印数据框时,您将已经看到所有列.因此,您也不需要注册临时视图即可为其命名.

Most importantly, select * isn't needed. When you print the dataframe, you would already see all the columns. Therefore, you also don't need to register the temp view to give it a name.

当您使用 format("console")时,就无需使用 .show()

And when you format("console"), that eliminates the need for .show()

Refer to the Spark examples for reading from a network socket and output to console.

val words = // omitted ... some Streaming DataFrame

// Generating a running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

带走-使用诸如 .select() .groupBy()之类的DataFrame操作,而不是原始SQL

Take away - use DataFrame operations like .select() and .groupBy() rather than raw SQL

或者您可以使用Spark Streaming,

Or you can use Spark Streaming, as shown in those examples, you need to foreachRDD over each stream batch, then convert these to a DataFrame, which you can query

/** Case class for converting RDD to DataFrame */
case class Record(word: String)

val words = // omitted ... some DStream

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
  // Get the singleton instance of SparkSession
  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
  import spark.implicits._

  // Convert RDD[String] to RDD[case class] to DataFrame
  val wordsDataFrame = rdd.map(w => Record(w)).toDF()

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on table using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  println(s"========= $time =========")
  wordCountsDataFrame.show()
}

ssc.start()
ssc.awaitTermination()

这篇关于出现错误,提示“必须使用writeStream.start()执行带有流源的查询".在火花结构化的流媒体上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆