Spark结构化流式Kafka微批计数 [英] Spark Structured Streaming Kafka Microbatch count

查看:100
本文介绍了Spark结构化流式Kafka微批计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark结构化流传输来读取Kafka主题中的记录;我打算计算Spark readstream

I am using Spark structured streaming to read records from a Kafka topic; I intend to count the number of records received in each 'Micro batch' in Spark readstream

这是一个片段:

val kafka_df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "test-count")
  .load()

我从文档中了解到,当开始启动 streamingQuery 时,将对kafka_df进行延迟评估(随后进行评估),并且对它进行评估时,它会保留一个微型批处理.因此,我认为在 topic 上执行 groupBy ,然后执行 count 应该可以.

I understand from the docs that kafka_df will be lazily evaluated when a streamingQuery is started (to come next), and as it is evaluated, it holds a micro-batch. So, I figured doing a groupBy on topic followed by a count should work.

赞:

val counter = kafka_df
             .groupBy("topic")
             .count()

现在要评估所有这些,我们需要一个streaminQuery(可以说是一个控制台接收器查询)以将其打印在控制台上.这就是我看到问题的地方.在 aggregate 数据帧上的streamingQuery,例如 kafka_df ,仅适用于 outputMode complete/update >,而不是在 追加 上.

Now to evaluate all of this, we need a streaminQuery, lets say, a console sink query to print it on the console. And this is where i see the problem. A streamingQuery on aggregate DataFrames, such as kafka_df works only with outputMode complete/update and not on append.

这实际上意味着,streamingQuery报告的计数是累积的.

This effectively means that, the count reported by the streamingQuery is cumulative.

赞:

 val counter_json = counter.toJSON   //to jsonify 
 val count_query = counter_json
                   .writeStream.outputMode("update")
                   .format("console")
                   .start()          // kicks of lazy evaluation
                   .awaitTermination()  

在受控设置中,其中:
实际已发布记录:1500
实际收到的微型批次:3
a实际收到的记录:1500

In a controlled set up, where:
actual Published records: 1500
actual Received micro-batches : 3
aActual Received records: 1500

每个微批处理的数量应该为 500 ,所以我希望(希望)查询能够打印到控制台:

The count of each microbatch is supposed to be 500, so I hoped (wished) that the query prints to console:

主题:测试计数
数:500
主题:测试计数
数:500
主题:测试计数
数:500

topic: test-count
count: 500
topic: test-count
count: 500
topic: test-count
count: 500

但事实并非如此.它实际上会打印:

But it doesn't. It actually prints:

主题:测试计数
数:500
主题:测试计数
数量:1000
主题:测试计数
数:1500

topic: test-count
count: 500
topic: test-count
count:1000
topic: test-count
count: 1500

我了解这是由于'outputMode'完成/更新(累积)

This I understand is because of 'outputMode' complete/update (cumulative)

我的问题:是否可以准确获取Spark-Kafka结构化流的每个微批计数?

My question: Is it possible to accurately get the count of each micro-batch is Spark-Kafka structured streaming?

从文档中,我发现了有关水印方法(以支持追加):

From the docs, I found out about the watermark approach (to support append):

val windowedCounts = kafka_df
                    .withWatermark("timestamp", "10 seconds")
                    .groupBy(window($"timestamp", "10 seconds", "10       seconds"), $"topic")
                    .count()

 val console_query = windowedCounts
                    .writeStream
                    .outputMode("append")
                    .format("console")
                    .start()
                    .awaitTermination()

但是此 console_query 的结果不准确,并且显示的方式与标准不符.

But the results of this console_query are inaccurate and appears is way off mark.

TL; DR-任何在Spark-Kafka微批处理中准确计数记录的想法都将受到赞赏.

TL;DR - Any thoughts on accurately counting the records in Spark-Kafka micro-batch would be appreciated.

推荐答案

"TL; DR-在Spark-Kafka微批处理中准确计数记录的任何想法都将受到赞赏."

"TL;DR - Any thoughts on accurately counting the records in Spark-Kafka micro-batch would be appreciated."

您可以使用 StreamingQueryListener (这使您可以打印出从已订阅的Kafka主题收到的确切行数. onQueryProgress API在每个微批处理期间都会被调用,并且包含有关查询的很多个有用的元信息.如果没有数据流入查询,则每10秒调用一次onQueryProgress.下面是一个简单的示例,它打印出输入消息的数量.

This allows you to print out the exact number of rows that were received from the subscribed Kafka topic. The onQueryProgress API gets called during every micro-batch and contains lots of useful meta information on your query. If no data is flowing into the query the onQueryProgress is called every 10 seconds. Below is a simple example that prints out the number of input messages.

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {}

    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {}

    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
      println("NumInputRows: " + queryProgress.progress.numInputRows)
    }
  })

如果您要验证结构化流查询的性能,通常最好留意以下两个指标:

In case you are validating the performance of your Structured Streaming query, it is usually best to keep an eye on the following two metrics:

  • queryProgress.progress.inputRowsPerSecond
  • queryProgress.progress.processedRowsPerSecond

如果输入高于已处理,则可能会增加工作资源或减少最大限制(通过减少readStream选项maxOffsetsPerTrigger).如果处理得更高,则可能要增加此限制.

In case input is higher than processed you might increase resources for your job or reduce the maximum limit (by reducing the readStream option maxOffsetsPerTrigger). If processed is higher, you may want to increase this limit.

这篇关于Spark结构化流式Kafka微批计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆