Spark流式传输:Spark结构化流中不允许使用Kafka组ID [英] Spark Streaming: Kafka group id not permitted in Spark Structured Streaming
问题描述
我正在PySpark中编写一个Spark结构的流应用程序,以从Kafka中读取数据.
I am writing a Spark structured streaming application in PySpark to read data from Kafka.
但是,Spark的当前版本是2.1.0,这不允许我将组ID设置为参数,并且会为每个查询生成唯一的ID.但是Kafka连接是基于组的授权,需要预先设置的组ID.
However, the current version of Spark is 2.1.0, which does not allow me to set group id as a parameter and will generate a unique id for each query. But the Kafka connection is group-based authorization which requires a pre-set group id.
因此,由于我的团队不希望将连接更新为,而无需将Spark更新到2.2 ,因此可以采用任何解决方法.
Hence, is there any workaround to establish the connection without the need to update Spark to 2.2 since my team does not want it.
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
推荐答案
KafkaUtils
类将覆盖"group.id"
的参数值.它将从原始组ID连接到"spark-executor-"
.
KafkaUtils
class will override the parameter value for "group.id"
. It will concat "spark-executor-"
in from of the orginal group id.
下面是KafkaUtils中执行此操作的代码:
Below is the code from KafkaUtils where is doing this:
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
我们遇到了同样的问题. Kafka基于带有预设组ID的ACL,因此唯一的更改是在kafka配置中更改组ID.在我们原始组ID的标签中,我们放入了"spark-executor-" + originalGroupId
We faced the same problem. Kafka was based on ACL with presets group id, so the only thing was to alter the group id in kafka configuration. Insead of our original group id we put "spark-executor-" + originalGroupId
这篇关于Spark流式传输:Spark结构化流中不允许使用Kafka组ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!