如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量? [英] How to manually set group.id and commit kafka offsets in spark structured streaming?

查看:29
本文介绍了如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读 Spark 结构化流媒体 - Kafka 集成指南 此处.

I was going through the Spark structured streaming - Kafka integration guide here.

在这个链接中被告知

enable.auto.commit:Kafka 源不提交任何偏移量.

enable.auto.commit: Kafka source doesn’t commit any offset.

那么,一旦我的 Spark 应用程序成功处理了每条记录,我该如何手动提交偏移量?

So how do I manually commit offsets once my spark application has successfully processed each record?

推荐答案

tl;dr

无法向 Kafka 提交任何消息.从 Spark 3.x 版本开始,您可以定义 Kafka 消费者组的名称,但是,这仍然不允许您提交任何消息.

It is not possible to commit any messages to Kafka. Starting with Spark version 3.x you can define the name of the Kafka consumer group, however, this still does not allow you to commit any messages.

根据结构化Kafka集成指南您可以提供 ConsumerGroup 作为选项 kafka.group.id:

According to the Structured Kafka Integration Guide you can provide the ConsumerGroup as an option kafka.group.id:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

但是,Spark 仍然不会提交任何偏移量,因此您将无法手动"执行此操作.向 Kafka 提交偏移量.此功能旨在处理 Kafka 的最新功能 使用基于角色的访问控制的授权 您的 ConsumerGroup 通常需要遵循命名约定.

However, Spark still will not commit any offsets back so you will not be able to "manually" commit offsets to Kafka. This feature is meant to deal with Kafka's latest feature Authorization using Role-Based Access Control for which your ConsumerGroup usually needs to follow naming conventions.

讨论并解决了 Spark 3.x 应用程序的完整示例 此处.

A full example of a Spark 3.x application is discussed and solved here.

Spark 结构化流 + Kafka 集成指南 明确说明它如何管理 Kafka 偏移量.Spark 不会将任何消息提交回 Kafka,因为它依靠内部偏移管理来实现容错.

The Spark Structured Streaming + Kafka integration Guide clearly states how it manages Kafka offsets. Spark will not commit any messages back to Kafka as it is relying on internal offset management for fault-tolerance.

用于管理偏移的最重要的 Kafka 配置是:

The most important Kafka configurations for managing offsets are:

  • group.id: Kafka 源会自动为每个查询创建一个唯一的组 ID.根据 代码 group.id 将设置为
  • group.id: Kafka source will create a unique group id for each query automatically. According to the code the group.id will be set to
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

  • auto.offset.reset: 设置源选项startingOffsets 以指定从哪里开始.Structured Streaming 管理内部消耗的偏移量,而不是依赖 kafka 消费者来完成.
  • enable.auto.commit: Kafka 源不提交任何偏移量.
    • auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it.
    • enable.auto.commit: Kafka source doesn’t commit any offset.
    • 因此,在 Structured Streaming 中,目前无法为 Kafka Consumer 定义您的自定义 group.id,并且 Structured Streaming 在内部管理偏移量,而不是提交回 Kafka(也不会自动提交).

      Therefore, in Structured Streaming it is currently not possible to define your custom group.id for Kafka Consumer and Structured Streaming is managing the offsets internally and not committing back to Kafka (also not automatically).

      假设您有一个简单的 Spark Structured Streaming 应用程序,可以读取和写入 Kafka,如下所示:

      Let's say you have a simple Spark Structured Streaming application that reads and writes to Kafka, like this:

      // create SparkSession
      val spark = SparkSession.builder()
        .appName("ListenerTester")
        .master("local[*]")
        .getOrCreate()
      
      // read from Kafka topic
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "testingKafkaProducer")
        .option("failOnDataLoss", "false")
        .load()
      
      // write to Kafka topic and set checkpoint directory for this stream
      df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("topic", "testingKafkaProducerOut")
        .option("checkpointLocation", "/home/.../sparkCheckpoint/")
        .start()
      

      Spark 偏移管理

      一旦提交了这个应用程序并且正在处理数据,就可以在检查点目录中找到相应的偏移量:

      Offset Management by Spark

      Once this application is submitted and data is being processed, the corresponding offset can be found in the checkpoint directory:

      myCheckpointDir/offsets/

      myCheckpointDir/offsets/

      {"testingKafkaProducer":{"0":1}}
      

      这里检查点文件中的条目确认要消耗的分区0的下一个偏移量是1.这意味着应用程序已经处理了名为 testingKafkaProducer 的主题的分区 0 的偏移量 0.

      Here the entry in the checkpoint file confirms that the next offset of partition 0 to be consumed is 1. It implies that the application already processes offset 0 from partition 0 of the topic named testingKafkaProducer.

      更多关于容错语义的信息在 Spark 文档.

      More on the fault-tolerance-semantics are given in the Spark Documentation.

      但是,如文档中所述,偏移量不会提交回 Kafka.这可以通过执行 Kafka 安装的 kafka-consumer-groups.sh 来检查.

      However, as stated in the documentation, the offset is not committed back to Kafka. This can be checked by executing the kafka-consumer-groups.sh of the Kafka installation.

      ./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"

      ./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"

      TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID      HOST         CLIENT-ID
      testingKafkaProducer 0          -               1               -    consumer-1-[...] /127.0.0.1   consumer-1
      

      Kafka 不知道此应用程序的当前偏移量,因为它从未提交过.

      The current offset for this application is unknown to Kafka as it has never been committed.

      请仔细阅读以下来自 Spark 提交者 @JungtaekLim 的关于解决方法的评论:Spark 的容错保证基于 Spark 完全控制偏移管理的事实,如果他们尝试,他们将取消保证修改它.(例如,如果他们更改为向 Kafka 提交偏移量,则没有批次信息,如果 Spark 需要移回特定批次,后面"保证不再有效.)"

      Please carefully read the comments below from Spark committer @JungtaekLim about the workaround: "Spark's fault tolerance guarantee is based on the fact Spark has a full control of offset management, and they're voiding the guarantee if they're trying to modify it. (e.g. If they change to commit offset to Kafka, then there's no batch information and if Spark needs to move back to the specific batch "behind" guarantee is no longer valid.)"

      我在网上看到的一些研究是,您可以在 Spark 的自定义 StreamingQueryListener 中的 onQueryProgress 方法的回调函数中提交偏移量.这样,您就可以拥有一个跟踪当前进度的消费者组.然而,它的进展不一定与实际的消费群体保持一致.

      What I have seen doing some research on the web is that you could commit offsets in the callback function of the onQueryProgress method in a customized StreamingQueryListener of Spark. That way, you could have a consumer group that keeps track of the current progress. However, its progress is not necessarily aligned with the actual consumer group.

      以下是您可能会觉得有用的一些链接:

      Here are some links you may find helpful:

      围绕偏移管理的 SO 讨论

      关于StreamingQueryListener的一般描述

      这篇关于如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆