如何在Structured Streaming的kafka数据源中为消费者组设置group.id? [英] How to set group.id for consumer group in kafka data source in Structured Streaming?
问题描述
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据.这意味着我需要强制使用特定的 group.id.但是,正如文档中所述,这是不可能的.尽管如此,在 databricks 文档 https://docs 中.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl,它说这是可能的.这只是指azure集群吗?
I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?
另外,通过查看 apache/spark repo 的 master 分支的文档 https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md,我们可以理解这样的功能是有意为之将在以后的 spark 版本中添加.你知道这样一个稳定版本的任何计划,这将允许设置消费者 group.id 吗?
Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?
如果没有,Spark 2.4.0 是否有任何解决方法可以设置特定的消费者 group.id?
If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?
推荐答案
目前 (v2.4.0) 无法实现.
Currently (v2.4.0) it is not possible.
您可以在 Apache Spark 项目中检查以下几行:
You can check following lines in Apache Spark project:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - 在用于创建KafkaConsumer
在主分支中,您可以找到修改,可以设置前缀或特定的group.id
In master branch you can find modification, that enable to setting prefix or particular group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - 根据组前缀(groupidprefix
)生成group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - generate group.id based on group prefix (groupidprefix
)
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - 设置先前生成的 groupId,如果 kafka.group.id
未在属性中传递
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - set previously generated groupId, if kafka.group.id
wasn't passed in properties
这篇关于如何在Structured Streaming的kafka数据源中为消费者组设置group.id?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!