Spark Structured Streaming - Checkpoint 中的 AssertionError 由于增加输入源的数量 [英] Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources

查看:19
本文介绍了Spark Structured Streaming - Checkpoint 中的 AssertionError 由于增加输入源的数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将两个流合并为一个并将结果写入主题

I am trying to join two streams into one and write the result to a topic

代码:1- 阅读两个主题

code: 1- Reading two topics

val PERSONINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xx:9092")
    .option("subscribe", "PERSONINFORMATION")
    .option("group.id", "info")
    .option("maxOffsetsPerTrigger", 1000)
    .option("startingOffsets", "earliest")
    .load()


val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "CANDIDATEINFORMATION")
    .option("group.id", "candent")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .option("failOnDataLoss", "false")
    .load()

2- 解析数据以加入它们:

2- Parse data to join them:

val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")

   val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")

   val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
   val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")

3- 加入两帧

  val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")

  val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))

4- 将它们写入主题

  string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", xxxx:9092")
      .option("topic", "toDelete")
      .option("checkpointLocation", "checkpoints")
      .option("failOnDataLoss", "false")
      .start()
      .awaitTermination()

错误信息:

    21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
       

推荐答案

您的代码在我看来不错,而是检查点导致了问题.

Your code looks fine to me, it is rather the checkpointing that is causing the issue.

根据您收到的错误消息,您可能仅使用一个流源运行此作业.然后,您添加了流连接的代码并尝试重新启动应用程序而不重新发送现有的检查点文件.现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个来源,现在您有两个来源.

Based on the error message you are getting you probably ran this job with only one stream source. Then, you added the code for the stream join and tried to re-start the application without remiving existing checkpoint files. Now, the application tries to recover from the checkpoint files but realises that you initially had only one source and now you have two sources.

流式查询更改后的恢复语义 解释了使用检查点时允许和不允许的更改.不允许更改输入源的数量:

The section Recovery Semantics after Changes in a Streaming Query explains which changes are allowed and not allowed when using checkpointing. Changing the number of input sources is not allowed:

更改输入源的数量或类型(即不同的源):这是不允许的."

"Changes in the number or type (i.e. different source) of input sources: This is not allowed."

解决您的问题:删除当前检查点文件并重新启动作业.

To solve your problem: Delete the current checkpoint files and re-start the job.

这篇关于Spark Structured Streaming - Checkpoint 中的 AssertionError 由于增加输入源的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆