如何为Spark结构化流指定kafka使用者的组ID? [英] How to specify the group id of kafka consumer for spark structured streaming?

查看:124
本文介绍了如何为Spark结构化流指定kafka使用者的组ID?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在同一个emr集群中运行2个spark结构化的流作业,以使用相同的kafka主题.这两个作业都处于运行状态.但是,只有一项工作可以获取kafka数据.我对kafka部件的配置如下.

I would like run 2 spark structured streaming jobs in the same emr cluster to consumer the same kafka topic. Both jobs are in the running status. However, only one job can get the kafka data. My configuration for kafka part is as following.

        .format("kafka")
        .option("kafka.bootstrap.servers", "xxx")
        .option("subscribe", "sametopic")
        .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "./cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
        .load()

我没有设置group.id.我猜两个作业中使用相同的组ID会导致此问题.但是,当我设置group.id时,它抱怨未使用用户指定的消费者组来跟踪偏移量".解决此问题的正确方法是什么?谢谢!

I did not set the group.id. I guess the same group id in two jobs are used to cause this issue. However, when I set the group.id, it complains that "user-specified consumer groups are not used to track offsets.". What is the correct way to solve this problem? Thanks!

推荐答案

您需要运行Spark v3.

You need to run Spark v3.

来自 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

kafka.group.id

kafka.group.id

从Kafka读取时在Kafka使用者中使用的Kafka组ID.请谨慎使用.默认情况下,每个查询都会生成一个唯一的组用于读取数据的ID.这样可以确保每个Kafka来源都有自己的来源不会受到任何其他干扰的消费者群体消费者,因此可以读取其所有分区订阅的主题.在某些情况下(例如,基于Kafka组授权),则可能需要使用特定的授权组ID来读取数据.您可以选择设置组ID.但是,用格外小心,因为它可能导致意外的行为.同时运行查询(批量,流式)或具有相同查询的源组ID可能会相互干扰,导致每个查询只读取部分数据.当查询是快速启动/重新启动.为了最大程度地减少此类问题,请设置Kafka使用者会话超时(通过设置选项"kafka.session.timeout.ms")非常小.设置后,选项"groupIdPrefix";将被忽略.

The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.

这篇关于如何为Spark结构化流指定kafka使用者的组ID?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆