如何在Spark结构化流中手动设置group.id并提交kafka偏移量? [英] How to manually set group.id and commit kafka offsets in spark structured streaming?
问题描述
I was going through the Spark structured streaming - Kafka integration guide here.
此链接告知
enable.auto.commit:Kafka源代码没有任何偏移量.
enable.auto.commit: Kafka source doesn’t commit any offset.
那么一旦我的spark应用程序成功处理了每条记录,我该如何手动提交偏移量?
So how do I manually commit offsets once my spark application has successfully processed each record?
推荐答案
现状(火花2.4.5)
此功能似乎正在Spark社区中进行讨论 https://github.com/apache/spark/pull/24613 .
Current Situation (Spark 2.4.5)
This feature seems to be under discussion in the Spark community https://github.com/apache/spark/pull/24613.
在该请求请求中,您还可以在 https:上找到一个可能的解决方案://github.com/HeartSaVioR/spark-sql-kafka-offset-committer .
In that Pull Request you will also find a possible solution for this at https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer.
目前,Spark结构化流+ Kafka集成文档清楚说明了它如何管理Kafka偏移量.偏移量最重要的Kafka特定配置是:
At the moment, the Spark Structured Streaming + Kafka integration documentation clearly states how it manages Kafka offsets. The most important Kafka specific configurations for offsets are:
- group.id: Kafka source will create a unique group id for each query automatically. According to the code the group.id will be set to
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
- auto.offset.reset::设置源选项startingOffsets以指定从何处开始. 结构化流技术管理内部使用的偏移量,而不是依靠kafka消费者来执行此操作.
- enable.auto.commit: Kafka源代码没有任何偏移量.
- auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it.
- enable.auto.commit: Kafka source doesn’t commit any offset.
因此,当前无法在结构化流中定义您的自定义group.id.Kafka使用者和结构化流将在内部管理偏移量,并且不会重新提交给Kafka(也不会自动).
Therefore, in Structured Streaming it is currently not possible to define your custom group.id for Kafka Consumer and Structured Streaming is managing the offsets internally and not committing back to Kafka (also not automatically).
假设您有一个简单的Spark结构化流应用程序,它可以对Kafka进行读写,如下所示:
Let's say you have a simple Spark Structured Streaming application that reads and writes to Kafka, like this:
// create SparkSession
val spark = SparkSession.builder()
.appName("ListenerTester")
.master("local[1]")
.getOrCreate()
// read from Kafka topic
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testingKafkaProducer")
.option("failOnDataLoss", "false")
.load()
// write to Kafka topic and set checkpoint directory for this stream
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "testingKafkaProducerOut")
.option("checkpointLocation", "/home/.../sparkCheckpoint/")
.start()
Spark的偏移量管理
提交此应用程序并处理数据后,可以在检查点目录中找到相应的偏移量:
Offset Management by Spark
Once this application is submitted and data is being processed, the corresponding offset can be found in the checkpoint directory:
myCheckpointDir/偏移量/
myCheckpointDir/offsets/
{"testingKafkaProducer":{"0":1}}
此处,检查点文件中的条目确认要使用的分区0
的下一个偏移量是1
.这意味着应用程序已经处理了名为testingKafkaProducer
的主题的分区0
的偏移量0
.
Here the entry in the checkpoint file confirms that the next offset of partition 0
to be consumed is 1
. It implies that the application already processes offset 0
from partition 0
of the topic named testingKafkaProducer
.
Spark 但是,如文档中所述,偏移量不被提交回Kafka.
可以通过执行Kafka安装的kafka-consumer-groups.sh
来检查.
However, as stated in the documentation, the offset is not committed back to Kafka.
This can be checked by executing the kafka-consumer-groups.sh
of the Kafka installation.
./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group"spark-kafka-source-92ea6f85-[...]-driver-0 "
./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0"
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testingKafkaProducer 0 - 1 - consumer-1-[...] /127.0.0.1 consumer-1
Kafka 不知道此应用程序的当前偏移量,因为它从未提交过.
The current offset for this application is unknown to Kafka as it has never been committed.
我在网络上进行的一些研究发现,您可以在自定义的Spark的StreamingQueryListener
的onQueryProgress
方法的回调函数中提交偏移量.
What I have seen doing some research on the web is that you could commit offsets in the callback function of the onQueryProgress
method in a customized StreamingQueryListener
of Spark.
由于我不会声称自己已经发展了这一点,因此以下是帮助我了解的最重要的链接:
As I will not claim to have developed this myself, here are the most important links that helped me to understand:
这篇关于如何在Spark结构化流中手动设置group.id并提交kafka偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!