Apache Spark-foreach对比foreachPartitions何时使用什么? [英] Apache Spark - foreach Vs foreachPartitions When to use What?

查看:544
本文介绍了Apache Spark-foreach对比foreachPartitions何时使用什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑到我按顺序流过RDD的情况,与foreach方法相比,我想知道由于更高的并行度,foreachPartitions是否会带来更好的性能?对累加器变量求和

I would like to know if the foreachPartitions will results in better performance, due to an higher level of parallelism, compared to the foreach method considering the case in which I'm flowing through an RDD in order to perform some sums into an accumulator variable.

推荐答案

foreachforeachPartitions是动作.

用于调用具有副作用的操作的通用函数.对于每个 RDD中的元素,它调用传递的函数. 这是 通常用于操纵累加器或写入外部 商店.

A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.

注意:在foreach()之外修改除累加器以外的变量可能会导致不确定的行为.有关更多信息,请参见了解闭包详细信息.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

示例:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(函数):单位

类似于foreach(),而不是为每个调用功能 元素,它为每个分区调用它.该功能应该能够 接受迭代器.这比foreach()更有效,因为

Similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).

foreachPartition示例的用法:

  • Example1:对于每个分区,您要使用一个数据库连接(在每个分区块的内部),这是如何使用scala进行连接的示例用法.

/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }

  • 示例2:
  • foreachPartition与Sparkstreaming(dstream)和kafka生产者一起使用

    Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
    // only once per partition You can safely share a thread-safe Kafka //producer instance.
        val producer = createKafkaProducer()
        partitionOfRecords.foreach { message =>
          producer.send(message)
        }
        producer.close()
      }
    }
    

    注意::如果要避免通过这种方式在每个分区上一次创建生产者,最好的方法是使用以下方法广播生产者 sparkContext.broadcast因为Kafka生产者是异步的, 在发送之前大量缓冲数据.

    Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using sparkContext.broadcast since Kafka producer is asynchronous and buffers data heavily before sending.


    累加器采样片段可与其一起玩...通过 您可以测试性能

    Accumulator samples snippet to play around with it... through which you can test the performance

    
         test("Foreach - Spark") {
            import spark.implicits._
            var accum = sc.longAccumulator
            sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
            assert(accum.value == 6L)
          }
    
          test("Foreach partition - Spark") {
            import spark.implicits._
            var accum = sc.longAccumulator
            sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
            assert(accum.value == 6L)
          }
    

    结论:

    foreachPartition在分区上的操作非常明显 比foreach

    foreachPartition operations on partitions so obviously it would be better edge than foreach

    经验法则:

    当您访问昂贵的文件时,应使用

    foreachPartition 资源,例如数据库连接或kafka生产者等. 每个分区一个,而不是每个元素(foreach)一个.什么时候 谈到累加器,您可以通过上述测试来衡量性能 方法,在使用累加器的情况下也应能更快地工作.

    foreachPartition should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..

    也...请参阅地图与地图分区,其概念相似,但它们是变形.

    Also... see map vs mappartitions which has similar concept but they are tranformations.

    这篇关于Apache Spark-foreach对比foreachPartitions何时使用什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆