具有流源的查询必须使用writeStream.start();执行; [英] Queries with streaming sources must be executed with writeStream.start();

查看:1773
本文介绍了具有流源的查询必须使用writeStream.start();执行;的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从Spark中读取来自kafka(版本10)的消息,并尝试进行打印.

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

在线程主"中获取错误异常

getting an error Exception in thread "main"

org.apache.spark.sql.AnalysisException:具有流源的查询 必须使用writeStream.start();;

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

推荐答案

您正在分支查询计划:您正在尝试从同一ds1进行查询:

You are branching the query plan: from the same ds1 you are trying to:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}
  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

但是您只在第二个分支上调用.start(),而使另一个悬空没有终止,这又会引发您返回的异常.

But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

解决方案是同时启动两个分支并等待终止.

The solution is to start both branches and await termination.

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()

这篇关于具有流源的查询必须使用writeStream.start();执行;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆