带有流源的查询必须使用 writeStream.start(); 执行; [英] Queries with streaming sources must be executed with writeStream.start();;

本文介绍了带有流源的查询必须使用 writeStream.start(); 执行;的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 结构化流式传输从 Kafka 读取数据并预测表单传入数据.我正在使用我使用 Spark ML 训练过的模型.

I am trying to read data from Kafka using spark structured streaming and predict form incoming data. I'm using model which I have trained using Spark ML.

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .master("local")
  .getOrCreate()
import spark.implicits._

val toString = udf((payload: Array[Byte]) => new String(payload))
val sentenceDataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe", "topicname1")
  .load().selectExpr("CAST(value AS STRING)").as[(String)]
sentenceDataFrame.printSchema()
val regexTokenizer = new RegexTokenizer()
  .setInputCol("value")
  .setOutputCol("words")
  .setPattern("\\W")
val tokencsv = regexTokenizer.transform(sentenceDataFrame)
val remover = new StopWordsRemover()
  .setInputCol("words")
  .setOutputCol("filtered")

val removestopdf = remover.transform(tokencsv)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("filtered")
  .setOutputCol("result")
  .setVectorSize(300)
  .setMinCount(0)

val model = word2Vec.fit(removestopdf)

val result = model.transform(removestopdf)


val featureIndexer = new VectorIndexer()
  .setInputCol("result")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(2)
  .fit(result)

val some = featureIndexer.transform(result)

val model1 = RandomForestClassificationModel.load("/home/akhil/Documents/traindata/stages/2_rfc_80e12c5d1259")

  val predict = model1.transform(result)

val query = predict.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

当我对流数据进行预测时,它给了我以下错误:

When I do prediction on streaming data it gives me following error:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: 
 Queries with streaming sources must be executed with 
writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:175)
at predict1model$.main(predict1model.scala:53)
at predict1model.main(predict1model.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

错误是指 word2vec.fit(removestopd) 行.任何帮助将非常感激 .

error is referring to word2vec.fit(removestopdf) line . Any help would be really appreciated .

推荐答案

一般来说,Structured Streaming 不能(目前 - 从 Spark 2.2 开始)用于训练 Spark ML 模型.结构化流媒体不支持某些操作.其中之一是将 Dataset 转换为它的 rdd 表示.特别是 word2Vec 的情况,需要到rdd层面去实现fit.

In general, Structured Streaming cannot (yet - as of Spark 2.2) be used to train Spark ML models. There are some operations that are not supported in Structured Streaming. One of those is to transform a Dataset to its rdd representation. In particular the case of word2Vec, it needs to go to the rdd level to implement fit.

尽管如此,还是可以在静态数据集上训练模型并将预测应用于流数据.transform 操作可用于流式 Dataset,如上所示:val result = model.transform(removestopdf)

Nevertheless, it's possible to train the model on a static dataset and apply the predictions on the streaming data. The transform operation is usable on a streaming Dataset, like above: val result = model.transform(removestopdf)

简而言之,我们需要在静态数据集上拟合模型.生成的 transformer 可以应用到流式Dataset.

In a nutshell, we need to fit the model on a static dataset. The resulting transformer can be applied to a streaming Dataset.

这篇关于带有流源的查询必须使用 writeStream.start(); 执行;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆