Spark流式传输:Spark结构化流中不允许使用Kafka组ID [英] Spark Streaming: Kafka group id not permitted in Spark Structured Streaming

查看:243
本文介绍了Spark流式传输:Spark结构化流中不允许使用Kafka组ID的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在PySpark中编写一个Spark结构的流应用程序,以从Kafka中读取数据.

I am writing a Spark structured streaming application in PySpark to read data from Kafka.

但是,Spark的当前版本是2.1.0,这不允许我将组ID设置为参数,并且会为每个查询生成唯一的ID.但是Kafka连接是基于组的授权,需要预先设置的组ID.

However, the current version of Spark is 2.1.0, which does not allow me to set group id as a parameter and will generate a unique id for each query. But the Kafka connection is group-based authorization which requires a pre-set group id.

因此,由于我的团队不希望将连接更新为,而无需将Spark更新到2.2 ,因此可以采用任何解决方法.

Hence, is there any workaround to establish the connection without the need to update Spark to 2.2 since my team does not want it.

我的代码:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("DNS").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # Subscribe to 1 topic
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
    print(lines.isStreaming) #print TRUE
    lines.selectExpr("CAST(value AS STRING)")
    # Split the lines into words
    words = lines.select(
    explode(
        split(lines.value, " ")
        ).alias("word")
    )
    # Generate running word count
    wordCounts = words.groupBy("word").count()

    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

推荐答案

KafkaUtils类将覆盖"group.id"的参数值.它将从原始组ID连接到"spark-executor-".

KafkaUtils class will override the parameter value for "group.id". It will concat "spark-executor-" in from of the orginal group id.

下面是KafkaUtils中执行此操作的代码:

Below is the code from KafkaUtils where is doing this:

// driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

我们遇到了同样的问题. Kafka基于带有预设组ID的ACL,因此唯一的更改是在kafka配置中更改组ID.在我们原始组ID的标签中,我们放入了"spark-executor-" + originalGroupId

We faced the same problem. Kafka was based on ACL with presets group id, so the only thing was to alter the group id in kafka configuration. Insead of our original group id we put "spark-executor-" + originalGroupId

这篇关于Spark流式传输:Spark结构化流中不允许使用Kafka组ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆