Spark Structured Streaming - Checkpoint 中的 AssertionError 由于增加输入源的数量 [英] Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources
问题描述
我正在尝试将两个流合并为一个并将结果写入主题
I am trying to join two streams into one and write the result to a topic
代码:1- 阅读两个主题
code: 1- Reading two topics
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
2- 解析数据以加入它们:
2- Parse data to join them:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
3- 加入两帧
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))
4- 将它们写入主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers", xxxx:9092")
.option("topic", "toDelete")
.option("checkpointLocation", "checkpoints")
.option("failOnDataLoss", "false")
.start()
.awaitTermination()
错误信息:
21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
推荐答案
您的代码在我看来不错,而是检查点导致了问题.
Your code looks fine to me, it is rather the checkpointing that is causing the issue.
根据您收到的错误消息,您可能仅使用一个流源运行此作业.然后,您添加了流连接的代码并尝试重新启动应用程序而不重新发送现有的检查点文件.现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个来源,现在您有两个来源.
Based on the error message you are getting you probably ran this job with only one stream source. Then, you added the code for the stream join and tried to re-start the application without remiving existing checkpoint files. Now, the application tries to recover from the checkpoint files but realises that you initially had only one source and now you have two sources.
流式查询更改后的恢复语义 解释了使用检查点时允许和不允许的更改.不允许更改输入源的数量:
The section Recovery Semantics after Changes in a Streaming Query explains which changes are allowed and not allowed when using checkpointing. Changing the number of input sources is not allowed:
更改输入源的数量或类型(即不同的源):这是不允许的."
"Changes in the number or type (i.e. different source) of input sources: This is not allowed."
解决您的问题:删除当前检查点文件并重新启动作业.
To solve your problem: Delete the current checkpoint files and re-start the job.
这篇关于Spark Structured Streaming - Checkpoint 中的 AssertionError 由于增加输入源的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!