Spark Structured Streaming Kafka Microbatch 计数 [英] Spark Structured Streaming Kafka Microbatch count

查看:51
本文介绍了Spark Structured Streaming Kafka Microbatch 计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark 结构化流从 Kafka 主题读取记录;我打算计算 Spark readstream

I am using Spark structured streaming to read records from a Kafka topic; I intend to count the number of records received in each 'Micro batch' in Spark readstream

这是一个片段:

val kafka_df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "test-count")
  .load()

我从文档中了解到 kafka_df 将在 streamingQuery 开始(接下来)时被懒惰地评估,并且在评估时,它持有一个微批次.所以,我认为在 topic 上做一个 groupBy 后跟一个 count 应该可以工作.

I understand from the docs that kafka_df will be lazily evaluated when a streamingQuery is started (to come next), and as it is evaluated, it holds a micro-batch. So, I figured doing a groupBy on topic followed by a count should work.

像这样:

val counter = kafka_df
             .groupBy("topic")
             .count()

现在要评估所有这些,我们需要一个 streaminQuery,比如说,一个控制台接收器查询以在控制台上打印它.这就是我看到问题的地方.aggregate 数据帧上的流查询,例如 kafka_df 仅适用于 outputMode complete/update 而不是 append.

Now to evaluate all of this, we need a streaminQuery, lets say, a console sink query to print it on the console. And this is where i see the problem. A streamingQuery on aggregate DataFrames, such as kafka_df works only with outputMode complete/update and not on append.

这实际上意味着,streamingQuery 报告的计数是累积的.

This effectively means that, the count reported by the streamingQuery is cumulative.

像这样:

 val counter_json = counter.toJSON   //to jsonify 
 val count_query = counter_json
                   .writeStream.outputMode("update")
                   .format("console")
                   .start()          // kicks of lazy evaluation
                   .awaitTermination()  

在受控设置中,其中:
实际发布记录:1500
实际收到的微批次:3
a实际接收记录:1500

In a controlled set up, where:
actual Published records: 1500
actual Received micro-batches : 3
aActual Received records: 1500

每个微批次的计数应该是 500,所以我希望(希望)查询打印到控制台:

The count of each microbatch is supposed to be 500, so I hoped (wished) that the query prints to console:

主题:测试计数
计数:500
主题:测试计数
计数:500
主题:测试计数
计数:500

topic: test-count
count: 500
topic: test-count
count: 500
topic: test-count
count: 500

但事实并非如此.它实际上打印:

But it doesn't. It actually prints:

主题:测试计数
计数:500
主题:测试计数
计数:1000
主题:测试计数
计数:1500

topic: test-count
count: 500
topic: test-count
count:1000
topic: test-count
count: 1500

我理解这是因为'outputMode'完成/更新(累积)

This I understand is because of 'outputMode' complete/update (cumulative)

我的问题:Spark-Kafka 结构化流式传输是否可以准确获取每个 micro-batch 的计数?

My question: Is it possible to accurately get the count of each micro-batch is Spark-Kafka structured streaming?

从文档中,我发现了水印方法(支持追加):

From the docs, I found out about the watermark approach (to support append):

val windowedCounts = kafka_df
                    .withWatermark("timestamp", "10 seconds")
                    .groupBy(window($"timestamp", "10 seconds", "10       seconds"), $"topic")
                    .count()

 val console_query = windowedCounts
                    .writeStream
                    .outputMode("append")
                    .format("console")
                    .start()
                    .awaitTermination()

但是这个 console_query 的结果是不准确的,并且看起来有点离谱.

But the results of this console_query are inaccurate and appears is way off mark.

TL;DR - 任何关于在 Spark-Kafka 微批次中准确计算记录的想法都将不胜感激.

TL;DR - Any thoughts on accurately counting the records in Spark-Kafka micro-batch would be appreciated.

推荐答案

如果您只想在使用 Kafka 的结构化流应用程序中的每个触发器处理特定数量的记录,请使用选项 maxOffsetsPerTrigger

If you want to only process a specific number of records with every trigger within a Structured Streaming application using Kafka, use the option maxOffsetsPerTrigger

val kafka_df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "test-count")
  .option("maxOffsetsPerTrigger", 500)
  .load()

这篇关于Spark Structured Streaming Kafka Microbatch 计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆