来自 kafka 的 Spark 结构化蒸汽 - 从检查点恢复后再次处理最后一条消息 [英] Spark structured steaming from kafka - last message processed again after resume from checkpoint

查看:22
本文介绍了来自 kafka 的 Spark 结构化蒸汽 - 从检查点恢复后再次处理最后一条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark 2.0.2 的全新(并标记为alpha")结构化流来读取来自 kafka 主题的消息并从中更新几个 cassandra 表:

I'm using the brand new (and tagged "alpha") Structured Streaming of Spark 2.0.2 to read messages from a kafka topic and update a couple of cassandra tables from it:

val readStream = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "maxwell")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
  .as[KafkaMessage]
  .map(<transform KafkaMessage to Company>)

val writeStream = readStream
  .writeStream
  .queryName("CompanyUpdatesInCassandra")
  .foreach(new ForeachWriter[Company] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(company: Company): Unit = {
      ...
    }

    def close(errorOrNull: Throwable): Unit = {}
  }
  .start
  .awaitTermination

我还在 sparkSession 上配置了一个检查点位置(spark.sql.streaming.checkpointLocation").这使我可以在流媒体应用程序恢复时立即接收到达的消息.

I also configured a checkpoint location ("spark.sql.streaming.checkpointLocation") on the sparkSession. This allows me to pick up messages that arrived while the streaming app was down as soon as it resumes.

然而,自从配置了这个检查点位置后,我注意到在恢复时它也始终如一地处理上一批的最后一条消息,即使它已经被正确处理而没有失败.

However, since configuring this checkpoint location I noticed that at resume it also consistently processes the last message of the previous batch even though it was already processed correctly without failure.

知道我在这里做错了什么吗?这似乎是一个非常常见的用例.

Any idea what I'm doing wrong here? This seems like a very common use case.

更多信息:

在此处查看相关日志(主题 5876 是上一批成功处理的最后一个主题):

See here the relevant logs (topic 5876 being the last topic that was succesfully processed by the previous batch):

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0]  1

此外,当我终止流时,我确保它优雅地停止以避免数据丢失:

Also, when I kill the stream, I make sure it is stopped gracefully to avoid data loss:

sys.ShutdownHookThread
{
  writeStream.stop
  sparkSession.stop
}

推荐答案

目前,Structured Streaming 会在生成新偏移量时检查状态.所以你描述的情况是可以预料的,最后提交的批次可能会在恢复后重新处理.但是,这是一个内部实现.假设我们在提交批处理时进行检查点,检查点仍然可能失败,并且您的接收器 ForeachWriter 也需要处理这种情况.

Currently, Structured Streaming checkpoints the states when a new offset is generated. So the case you described is expected, the last committed batch may be reprocessed after recovery. However, that's an internal implementation. Let's say if we do the checkpoint when committing a batch, it's still possible that the checkpointing fails, and your sink, ForeachWriter, also needs to handle this case.

通常,您的接收器应该始终是幂等的.

Generally, your sink should always be idempotent.

更新:在 Spark 2.2.0 中,如果恢复成功,Structured Streaming 不会重新运行批处理.

Updated: in Spark 2.2.0, Structured Streaming doesn't rerun a batch after recovery if it was successful.

这篇关于来自 kafka 的 Spark 结构化蒸汽 - 从检查点恢复后再次处理最后一条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆