Spark Streaming 由于与正在读取的主题不同的 Kafka 主题出错而失败 [英] Spark Streaming failing due to error on a different Kafka topic than the one being read

查看:22
本文介绍了Spark Streaming 由于与正在读取的主题不同的 Kafka 主题出错而失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于以下写主题/读主题air2008rand串联:

For the following write topic/read topic air2008rand tandem :

import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)

由于不同主题air2008m1-0而产生错误:

scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0] 
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone. 
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)

通过停止读/写代码(在 spark-shell repl 中)然后重新运行它,这种行为是可重复的.

This behavior is repeatable by stopping the read/write code (in spark-shell repl) and then re-running it.

为什么这里不同的 kafka 主题之间存在串扰"?

Why is there "cross-talk" between different kafka topics here?

推荐答案

问题是由于检查点目录包含来自早期 Spark 流操作的数据.解决方法是更改​​检查点目录.

The problem is due to a checkpoint directory containing data from an earlier spark streaming operation. The resolution is to change the checkpoint directory.

在这个问题的评论中找到了解决方案(来自@jaceklaskowski 本人)[IllegalStateException]: Spark Structured Streaming 正在终止 Streaming Query with Error

The solution was found as a comment (from @jaceklaskowski himself) in this question [IllegalStateException]: Spark Structured Streaming is termination Streaming Query with Error

这篇关于Spark Streaming 由于与正在读取的主题不同的 Kafka 主题出错而失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆