具有流源的查询必须使用writeStream.start();执行; [英] Queries with streaming sources must be executed with writeStream.start();
问题描述
我正在尝试从Spark中读取来自kafka(版本10)的消息,并尝试进行打印.
I'm trying to read the messages from kafka (version 10) in spark and trying to print it.
import spark.implicits._
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()
ds1.printSchema()
在线程主"中获取错误异常
getting an error Exception in thread "main"
org.apache.spark.sql.AnalysisException:具有流源的查询 必须使用writeStream.start();;
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
推荐答案
您正在分支查询计划:您正在尝试从同一ds1进行查询:
You are branching the query plan: from the same ds1 you are trying to:
-
ds1.collect.foreach(...)
-
ds1.writeStream.format(...){...}
ds1.collect.foreach(...)
ds1.writeStream.format(...){...}
但是您只在第二个分支上调用.start()
,而使另一个悬空没有终止,这又会引发您返回的异常.
But you are only calling .start()
on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.
解决方案是同时启动两个分支并等待终止.
The solution is to start both branches and await termination.
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()
ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
这篇关于具有流源的查询必须使用writeStream.start();执行;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!