为createDirectStream设置spark.streaming.kafka.maxRatePerPartition [英] set spark.streaming.kafka.maxRatePerPartition for createDirectStream

查看:667
本文介绍了为createDirectStream设置spark.streaming.kafka.maxRatePerPartition的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要为我的应用程序增加每个分区的输入速率,并且我已经将.set("spark.streaming.kafka.maxRatePerPartition",100)用于配置.流持续时间为10秒,因此我希望此批处理有5*100*10=5000条消息.但是,我收到的输入速率大约是500.您能建议做些修改以提高此速率吗?

I need to increase the input rate per partition for my application and I have use .set("spark.streaming.kafka.maxRatePerPartition",100) for the config. The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch. However, the input rate I received is just about 500. Can You suggest any modifications to increase this rate?

推荐答案

流持续时间为10s,所以我希望过程5 * 100 * 10 = 5000条消息 为此批次.

The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch.

这不是设置的含义.这表示每个分区可以有多少元素每批" ,而不是每秒 .我假设您有5个分区,所以得到5 * 100 =500.如果要5000,请将maxRatePerPartition设置为1000.

That's not what the setting means. It means "how many elements each partition can have per batch", not per second. I'm going to assume you have 5 partitions, so you're getting 5 * 100 = 500. If you want 5000, set maxRatePerPartition to 1000.

来自完全一次(由Direct Stream方法的作者Cody撰写,重点是我的文章):

From "Exactly-once Spark Streaming From Apache Kafka" (written by the Cody, the author of the Direct Stream approach, emphasis mine):

对于速率限制,可以使用Spark配置变量 spark.streaming.kafka.maxRatePerPartition设置最大数量 邮件每个分区的每个分区.

For rate limiting, you can use the Spark configuration variable spark.streaming.kafka.maxRatePerPartition to set the maximum number of messages per partition per batch.

在@avrs评论后,

After @avrs comment, I looked inside the code which defines the max rate. As it turns out, the heuristic is a bit more complex than stated in both the blog post and the docs.

有两个分支.如果在maxRate旁边启用了背压,则maxRate是RateEstimator对象计算的当前背压率与用户设置的maxRate之间的最小值.如果未启用,则采用原样定义的maxRate.

There are two branches. If backpressure is enabled alongside maxRate, then the maxRate is the minimum between the current backpressure rate calculated by the RateEstimator object and maxRate set by the user. If it isn't enabled, it takes the maxRate defined as is.

现在,在选择速率后,将其始终乘以总批处理秒数,从而有效地将其设置为每秒的速率:

Now, after selecting the rate it always multiplies by the total batch seconds, effectively making this a rate per second:

if (effectiveRateLimitPerPartition.values.sum > 0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
  Some(effectiveRateLimitPerPartition.map {
    case (tp, limit) => tp -> (secsPerBatch * limit).toLong
  })
} else {
  None
}

这篇关于为createDirectStream设置spark.streaming.kafka.maxRatePerPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆