Spark Streaming:如何向 DStream 添加更多分区? [英] Spark Streaming: How can I add more partitions to my DStream?

查看:80
本文介绍了Spark Streaming:如何向 DStream 添加更多分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像这样的火花流应用:

val message = KafkaUtils.createStream(...).map(_._2)message.foreachRDD( rdd => {如果(!rdd.isEmpty){val kafkaDF = sqlContext.read.json(rdd)kafkaDF.foreachPartition(我 =>{创建连接()i.foreach(行 =>{connection.sendToTable()})关闭连接()})

而且,我使用

在纱线集群上运行它

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

当我尝试记录 kafkaDF.rdd.partitions.size 时,结果主要是1"或5".我很困惑,是否可以控制我的 DataFrame 的分区数量?KafkaUtils.createStream 似乎不接受与我想要的 rdd 分区数相关的任何参数.我尝试了 kafkaDF.rdd.repartition( int ),但它似乎也不起作用.

如何在我的代码中实现更多的并行性?如果我的方法是错误的,那么实现它的正确方法是什么?

解决方案

在 Spark Streaming 中,并行性可以在两个方面实现:(a) 消费者/接收者(在您的情况下是 Kafka 消费者),以及 (b)处理(由 Spark 完成).

默认情况下,Spark Streaming 会为每个消费者分配一个核心(又名线程).因此,如果您需要摄取更多数据,则需要创建更多消费者.每个消费者都会创建一个 DStream.然后您可以联合 DStreams 以获得一个大流.

//一个有两个线程供消费者使用的基本示例val messageStream1 = KafkaUtils.createStream(...)//比如说,读取主题 Aval messageStream2 = KafkaUtils.createStream(...)//还有这个阅读主题 Bval combineStream = messageStream1.union(messageStream2)

或者,可以通过重新分区输入流来增加接收者/消费者的数量:

inputStream.repartition(<分区数>))

流媒体应用可用的所有剩余内核都将分配给 Spark.

因此,如果您有 N 个内核(通过 spark.cores.max 定义)并且您有 C 消费者,那么您只剩下 NC 可用于 Spark 的内核.

#Partitions =~ #Consumers x(批量持续时间/区块间隔)

块间隔 = 消费者在将其创建的数据作为火花块推送之前等待的时间(定义为配置 spark.streaming.blockInterval).

永远记住,Spark Streaming 有两个不断发生的功能.一组读取当前微批次(消费者)的线程,以及一组处理前一个微批次(Spark)的线程.

有关更多性能调优技巧,请参阅此处, 此处此处.>

I have a spark-streaming app which looks like this:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

And, I run it on a yarn cluster using

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

When I try to log kafkaDF.rdd.partitions.size, the result turns out be '1' or '5' mostly. I am confused, is it possible to control the number of partitions of my DataFrame? KafkaUtils.createStream doesn't seem to accept any parameters related to the number of partitions I want for the rdd. I tried kafkaDF.rdd.repartition( int ), but it doesn't seem to work either.

How can I achieve more parallelism in my code? If my approach is wrong, what is the correct way to achieve it?

解决方案

In Spark Streaming, parallelism can be achieved in two areas: (a) the consumers/receivers (in your case the Kafka consumers), and (b) the processing (done by Spark).

By default, spark streaming will assign one core (aka Thread) to each consumer. So if you need more data to be ingested you need to create more consumers. Each consumer will create a DStream. You can then union the DStreams to get one large stream.

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)

Alternatively, the number of receivers/consumers can be increased by repartitioning the input stream:

inputStream.repartition(<number of partitions>))

All the remaining cores available to the streaming app will be assigned to Spark.

So if you have N cores (defined via the spark.cores.max) and you have C consumers you are left with N-C cores available for Spark.

#Partitions =~  #Consumers x (batch duration / block interval)

block interval = how long a consumer waits before it pushes the data it created as a spark block (defined as configuration spark.streaming.blockInterval).

Always keep in mind that Spark Streaming has two functions that constantly take place. A set of threads that read the current micro-batch (consumers), and a set of threads that process the previous micro-batch (Spark).

For more performance tuning tips please refer to here, here and here.

这篇关于Spark Streaming:如何向 DStream 添加更多分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆