出现错误,提示“必须使用writeStream.start()执行带有流源的查询".在火花结构化的流媒体上 [英] Getting error saying "Queries with streaming sources must be executed with writeStream.start()" on spark structured streaming
问题描述
在spark结构流之上执行spark SQL时遇到一些问题.PFA错误.
I am getting some issues while executing spark SQL on top of spark structures streaming. PFA for error.
这是我的代码
object sparkSqlIntegration {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
.getOrCreate()
setupLogging()
val userSchema = new StructType().add("name", "string").add("age", "integer")
// Create a stream of text files dumped into the logs directory
val rawData = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///C:/Users/R/Documents/spark-poc-centri/csvFolder")
// Must import spark.implicits for conversion to DataSet to work!
import spark.implicits._
rawData.createOrReplaceTempView("updates")
val sqlResult= spark.sql("select * from updates")
println("sql results here")
sqlResult.show()
println("Otheres")
val query = rawData.writeStream.outputMode("append").format("console").start()
// Keep going until we're stopped.
query.awaitTermination()
spark.stop()
}
}
在执行过程中,出现以下错误.我是流媒体新手,谁能告诉我如何在Spark结构化流媒体上执行Spark SQL查询
During execution, I am getting the following error. As I am new to streaming can anyone tell how can I execute spark SQL queries on spark structured streaming
2018-12-27 16:02:40 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, LAPTOP-5IHPFLOD, 6829, None)
2018-12-27 16:02:41 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6731787b{/metrics/json,null,AVAILABLE,@Spark}
sql results here
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///C:/Users/R/Documents/spark-poc-centri/csvFolder]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
推荐答案
您不需要这些行
import spark.implicits._
rawData.createOrReplaceTempView("updates")
val sqlResult= spark.sql("select * from updates")
println("sql results here")
sqlResult.show()
println("Otheres")
最重要的是,不需要 select *
.当您打印数据框时,您将已经看到所有列.因此,您也不需要注册临时视图即可为其命名.
Most importantly, select *
isn't needed. When you print the dataframe, you would already see all the columns. Therefore, you also don't need to register the temp view to give it a name.
当您使用 format("console")
时,就无需使用 .show()
And when you format("console")
, that eliminates the need for .show()
Refer to the Spark examples for reading from a network socket and output to console.
val words = // omitted ... some Streaming DataFrame
// Generating a running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
带走-使用诸如 .select()
和 .groupBy()
之类的DataFrame操作,而不是原始SQL
Take away - use DataFrame operations like .select()
and .groupBy()
rather than raw SQL
Or you can use Spark Streaming, as shown in those examples, you need to foreachRDD
over each stream batch, then convert these to a DataFrame, which you can query
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
val words = // omitted ... some DStream
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
这篇关于出现错误,提示“必须使用writeStream.start()执行带有流源的查询".在火花结构化的流媒体上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!