由于与正在读取的主题不同的Kafka主题发生错误,Spark Streaming失败 [英] Spark Streaming failing due to error on a different Kafka topic than the one being read
问题描述
对于以下编写主题/阅读主题 air2008rand
串联:
For the following write topic/read topic air2008rand
tandem :
import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)
由于主题 air2008m1-0
不同而产生错误:
There is an error generated due to a a different topic air2008m1-0
:
scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0]
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone.
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
通过停止读/写代码(在spark-shell repl
中)然后重新运行它,可以重复此行为.
This behavior is repeatable by stopping the read/write code (in spark-shell repl
) and then re-running it.
为什么这里的不同kafka主题之间存在串扰"?
Why is there "cross-talk" between different kafka topics here?
推荐答案
问题是由于检查点目录中包含来自早期Spark Streaming操作的数据.解决方法是更改检查点目录.
The problem is due to a checkpoint directory containing data from an earlier spark streaming operation. The resolution is to change the checkpoint directory.
该解决方案被发现为对此问题的评论(来自@jaceklaskowski本人)
The solution was found as a comment (from @jaceklaskowski himself) in this question [IllegalStateException]: Spark Structured Streaming is termination Streaming Query with Error
这篇关于由于与正在读取的主题不同的Kafka主题发生错误,Spark Streaming失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!