星火流:我怎么可以添加更多的分区我DSTREAM? [英] Spark Streaming: How can I add more partitions to my DStream?

查看:247
本文介绍了星火流:我怎么可以添加更多的分区我DSTREAM?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花流应用程序,它看起来是这样的:

I have a spark-streaming app which looks like this:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

和我使用的纱线集群上运行它。

And, I run it on a yarn cluster using

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

当我尝试登录 kafkaDF.rdd.partitions.size ,结果原来是'1'或'5'居多。我很困惑,是不是可以控制我的数据框的分区的数量? KafkaUtils.createStream 似乎并不接受有关我想要的RDD分区数量的任何参数。我试图 kafkaDF.rdd.repartition(INT),但它似乎没有任何工作。

When I try to log kafkaDF.rdd.partitions.size, the result turns out be '1' or '5' mostly. I am confused, is it possible to control the number of partitions of my DataFrame? KafkaUtils.createStream doesn't seem to accept any parameters related to the number of partitions I want for the rdd. I tried kafkaDF.rdd.repartition( int ), but it doesn't seem to work either.

如何我在code ++实现更多的并行?如果我的做法是错的,什么是实现这一目标的正确方法是什么?

How can I achieve more parallelism in my code? If my approach is wrong, what is the correct way to achieve it?

推荐答案

在星火流,并行可以在两个方面来实现:(一)消费者/接收器(在你的情况下,卡夫卡消费者)和(二)处理(火花完成)。

In Spark Streaming, parallelism can be achieved in two areas: (a) the consumers/receivers (in your case the Kafka consumers), and (b) the processing (done by Spark).

在默认情况下,火花流将分配一个核心(即主题),以每一个消费者。所以,如果你需要更多的数据将被摄取您需要创建更多的消费者。每个消费者将创建一个DSTREAM。然后,您可以工会DStreams得到一个大的数据流。

By default, spark streaming will assign one core (aka Thread) to each consumer. So if you need more data to be ingested you need to create more consumers. Each consumer will create a DStream. You can then union the DStreams to get one large stream.

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)

<一个href=\"http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving\"相对=nofollow>或者,接收机数量/消费者可提高重新分割输入流:

Alternatively, the number of receivers/consumers can be increased by repartitioning the input stream:

inputStream.repartition(<number of partitions>))

所有可用的应用程序流,其余内核将被分配到的火花。

All the remaining cores available to the streaming app will be assigned to Spark.

所以,如果你有 N 内核(通过定义的 spark.cores.max ),你有<$你留下了 NC 内核可ç$ C> C 消费者的火花。

So if you have N cores (defined via the spark.cores.max) and you have C consumers you are left with N-C cores available for Spark.

#Partitions =~  #Consumers x (batch duration / block interval)

块间隔 =消费者等待多久它推动它作为一个火花块(定义为配置创建的数据之前 spark.streaming.blockInterval )。

block interval = how long a consumer waits before it pushes the data it created as a spark block (defined as configuration spark.streaming.blockInterval).

始终牢记,星火流具有两种功能的不断发生。一组线程读取当前微批(消费者),和一组处理previous微批次(火花)的线程。

Always keep in mind that Spark Streaming has two functions that constantly take place. A set of threads that read the current micro-batch (consumers), and a set of threads that process the previous micro-batch (Spark).

有关更多的性能调整提示请参阅 ,<一个href=\"https://blog.cloudera.com/blog/2016/01/how-cigna-tuned-its-spark-streaming-app-for-real-time-processing-with-apache-kafka/\"相对=nofollow>这里和这里

For more performance tuning tips please refer to here, here and here.

这篇关于星火流:我怎么可以添加更多的分区我DSTREAM?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆