Spark Structured Streaming Kafka Microbatch 计数 [英] Spark Structured Streaming Kafka Microbatch count
问题描述
我正在使用 Spark 结构化流从 Kafka 主题读取记录;我打算计算 Spark readstream
I am using Spark structured streaming to read records from a Kafka topic; I intend to count the number of records received in each 'Micro batch' in Spark readstream
这是一个片段:
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.load()
我从文档中了解到 kafka_df 将在 streamingQuery
开始(接下来)时被懒惰地评估,并且在评估时,它持有一个微批次.所以,我认为在 topic 上做一个 groupBy
后跟一个 count
应该可以工作.
I understand from the docs that kafka_df will be lazily evaluated when a streamingQuery
is started (to come next), and as it is evaluated, it holds a micro-batch. So, I figured doing a groupBy
on topic followed by a count
should work.
像这样:
val counter = kafka_df
.groupBy("topic")
.count()
现在要评估所有这些,我们需要一个 streaminQuery,比如说,一个控制台接收器查询以在控制台上打印它.这就是我看到问题的地方.aggregate
数据帧上的流查询,例如 kafka_df
仅适用于 outputMode
complete/update 而不是 append.
Now to evaluate all of this, we need a streaminQuery, lets say, a console sink query to print it on the console. And this is where i see the problem. A streamingQuery on aggregate
DataFrames, such as kafka_df
works only with outputMode
complete/update and not on append.
这实际上意味着,streamingQuery 报告的计数是累积的.
This effectively means that, the count reported by the streamingQuery is cumulative.
像这样:
val counter_json = counter.toJSON //to jsonify
val count_query = counter_json
.writeStream.outputMode("update")
.format("console")
.start() // kicks of lazy evaluation
.awaitTermination()
在受控设置中,其中:
实际发布记录:1500
实际收到的微批次:3
a实际接收记录:1500
In a controlled set up, where:
actual Published records: 1500
actual Received micro-batches : 3
aActual Received records: 1500
每个微批次的计数应该是 500,所以我希望(希望)查询打印到控制台:
The count of each microbatch is supposed to be 500, so I hoped (wished) that the query prints to console:
主题:测试计数
计数:500
主题:测试计数
计数:500
主题:测试计数
计数:500
topic: test-count
count: 500
topic: test-count
count: 500
topic: test-count
count: 500
但事实并非如此.它实际上打印:
But it doesn't. It actually prints:
主题:测试计数
计数:500
主题:测试计数
计数:1000
主题:测试计数
计数:1500
topic: test-count
count: 500
topic: test-count
count:1000
topic: test-count
count: 1500
我理解这是因为'outputMode'完成/更新(累积)
This I understand is because of 'outputMode' complete/update (cumulative)
我的问题:Spark-Kafka 结构化流式传输是否可以准确获取每个 micro-batch 的计数?
My question: Is it possible to accurately get the count of each micro-batch is Spark-Kafka structured streaming?
从文档中,我发现了水印方法(支持追加):
From the docs, I found out about the watermark approach (to support append):
val windowedCounts = kafka_df
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"topic")
.count()
val console_query = windowedCounts
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
但是这个 console_query
的结果是不准确的,并且看起来有点离谱.
But the results of this console_query
are inaccurate and appears is way off mark.
TL;DR - 任何关于在 Spark-Kafka 微批次中准确计算记录的想法都将不胜感激.
TL;DR - Any thoughts on accurately counting the records in Spark-Kafka micro-batch would be appreciated.
推荐答案
如果您只想在使用 Kafka 的结构化流应用程序中的每个触发器处理特定数量的记录,请使用选项 maxOffsetsPerTrigger
If you want to only process a specific number of records with every trigger within a Structured Streaming application using Kafka, use the option maxOffsetsPerTrigger
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.option("maxOffsetsPerTrigger", 500)
.load()
这篇关于Spark Structured Streaming Kafka Microbatch 计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!