使用Kafka进行Spark结构化流式传输只会产生一批(Pyspark) [英] Spark structured streaming with kafka leads to only one batch (Pyspark)

查看:81
本文介绍了使用Kafka进行Spark结构化流式传输只会产生一批(Pyspark)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码,我想知道为什么它只生成一批:

I have the following code and I'm wondering why it generates only one batch:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "IP").option("subscribe", "Topic").option("startingOffsets","earliest").load()
// groupby on slidings windows
query = slidingWindowsDF.writeStream.queryName("bla").outputMode("complete").format("memory").start()

使用以下参数启动该应用程序:

The application is launched with the following parameters:

spark.streaming.backpressure.initialRate 5
spark.streaming.backpressure.enabled True

kafka主题包含大约1100万条消息.我期望由于initialRate参数它至少应该生成两个批处理,但是它只能生成一个.谁能说出为什么spark只一批处理我的代码吗?

The kafka topic contains around 11 million messages. I'm expecting that it should at least generate two batches due to the initialRate parameter, but it generates only one. Can anyone tell why spark is processing my code in only one batch?

我正在使用Spark 2.2.1和Kafka 1.0.

I'm using Spark 2.2.1 and Kafka 1.0.

推荐答案

这是因为 spark.streaming.backpressure.initialRate 参数仅由旧的Spark Streaming使用,而不由结构化Streaming使用.

That is because spark.streaming.backpressure.initialRate parameter is used only by old Spark Streaming, not Structured Streaming.

相反,请使用 maxOffsetsPerTrigger : http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

顺便说一句,另请参阅以下答案: Spark结构化流如何处理背压?,SSS现在没有完整的反压支持

BTW, see also this answer: How Spark Structured Streaming handles backpressure?, SSS now don't have full backpressure support

这篇关于使用Kafka进行Spark结构化流式传输只会产生一批(Pyspark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆