Spark Streaming 由于与正在读取的主题不同的 Kafka 主题出错而失败 [英] Spark Streaming failing due to error on a different Kafka topic than the one being read
问题描述
对于以下写主题/读主题air2008rand
串联:
For the following write topic/read topic air2008rand
tandem :
import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)
由于不同主题air2008m1-0
而产生错误:
scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0]
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone.
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
通过停止读/写代码(在 spark-shell repl
中)然后重新运行它,这种行为是可重复的.
This behavior is repeatable by stopping the read/write code (in spark-shell repl
) and then re-running it.
为什么这里不同的 kafka 主题之间存在串扰"?
Why is there "cross-talk" between different kafka topics here?
推荐答案
问题是由于检查点目录包含来自早期 Spark 流操作的数据.解决方法是更改检查点目录.
The problem is due to a checkpoint directory containing data from an earlier spark streaming operation. The resolution is to change the checkpoint directory.
在这个问题的评论中找到了解决方案(来自@jaceklaskowski 本人)[IllegalStateException]: Spark Structured Streaming 正在终止 Streaming Query with Error
The solution was found as a comment (from @jaceklaskowski himself) in this question [IllegalStateException]: Spark Structured Streaming is termination Streaming Query with Error
这篇关于Spark Streaming 由于与正在读取的主题不同的 Kafka 主题出错而失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!