无法设置Kafka Spark使用者配置 [英] Unable to set kafka spark consumer configs
问题描述
我在kafka客户端上使用spark-sql-2.4.x版本.
Me using spark-sql-2.4.x version of with kafka client.
即使在设置使用者配置参数之后 IE. 最大分区提取字节数max.poll.records
Even after setting the consumer configuration parameter i.e. max.partition.fetch.bytes & max.poll.records
设置不正确,并显示以下默认值
it is not being set properly and showing default values as below
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", server1)
.option("subscribe", TOPIC1)
.option("includeTimestamp", true)
.option("startingOffsets", "latest")
.option("max.partition.fetch.bytes", "2097152") // default 1000,000
.option("max.poll.records", 6000) // default 500
.option("metadata.max.age.ms", 450000) // default 300000
.option("failOnDataLoss", false)
.load();
启动使用者时,它仍显示在以下日志中:
[Executor task launch worker for task 21] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
什么是正确的设置方式?
what is the correct way to set this ?
推荐答案
来自文档:
Kafka自己的配置可以通过DataStreamReader.option与 卡夫卡.前缀,例如stream.option("kafka.bootstrap.servers", 主机:端口").有关可能的kafka参数,请参见 Kafka使用者配置 有关与读取数据有关的参数的文档,以及 Kafka生产者配置 有关与写入数据有关的参数的文档.
Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port"). For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.
我相信您需要添加"kafka".您的选择,例如:
I believe you need to add "kafka." to your options, like:
.option("kafka.max.poll.records", 6000)
这篇关于无法设置Kafka Spark使用者配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!