由于与正在读取的主题不同的Kafka主题发生错误,Spark Streaming失败 [英] Spark Streaming failing due to error on a different Kafka topic than the one being read

查看:163
本文介绍了由于与正在读取的主题不同的Kafka主题发生错误,Spark Streaming失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于以下编写主题/阅读主题 air2008rand 串联:

For the following write topic/read topic air2008rand tandem :

import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)

由于主题 air2008m1-0 不同而产生错误:

There is an error generated due to a a different topic air2008m1-0:

scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0] 
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone. 
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)

通过停止读/写代码(在spark-shell repl 中)然后重新运行它,可以重复此行为.

This behavior is repeatable by stopping the read/write code (in spark-shell repl) and then re-running it.

为什么这里的不同kafka主题之间存在串扰"?

Why is there "cross-talk" between different kafka topics here?

推荐答案

问题是由于检查点目录中包含来自早期Spark Streaming操作的数据.解决方法是更改​​检查点目录.

The problem is due to a checkpoint directory containing data from an earlier spark streaming operation. The resolution is to change the checkpoint directory.

该解决方案被发现为对此问题的评论(来自@jaceklaskowski本人)

The solution was found as a comment (from @jaceklaskowski himself) in this question [IllegalStateException]: Spark Structured Streaming is termination Streaming Query with Error

这篇关于由于与正在读取的主题不同的Kafka主题发生错误,Spark Streaming失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆