Apache Spark - foreach 与 foreachPartition 何时使用什么? [英] Apache Spark - foreach Vs foreachPartition When to use What?

查看:27
本文介绍了Apache Spark - foreach 与 foreachPartition 何时使用什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道 foreachPartition 是否会导致更好的性能,因为与 foreach 方法相比,考虑到我的情况,由于更高级别的并行性'm 流经 RDD 以便对累加器变量执行一些求和.

解决方案

foreachforeachPartitions 是动作.

foreach(function): 单位

<块引用>

用于调用具有副作用的操作的通用函数.对于每个RDD 中的元素,它调用传递的函数.这是通常用于操作累加器或写入外部商店.

注意:在 foreach() 之外修改除累加器之外的变量可能会导致未定义的行为.有关更多信息,请参阅了解闭包详情.

示例:

scala>val accum = sc.longAccumulator("我的累加器")accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)标度>sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))...10/09/29 18:41:08 INFO SparkContext:任务在 0.317106 秒内完成标度>累积值res2:长 = 10

foreachPartition(function): 单位

<块引用>

类似于 foreach() ,但不是为每个调用函数元素,它为每个分区调用它.功能应该可以接受一个迭代器.这比 foreach() 更有效,因为它减少了函数调用的次数(就像 mapPartitions() ).

foreachPartition 的用法示例:

<小时>
  • 示例 1:对于每个分区,您要使用一个数据库连接(每个分区块的内部),然后这是如何使用 Scala 完成的示例用法.
<前>/*** 使用 foreach 分区插入数据库.** @param sqlDatabaseConnectionString* @param sqlTableName*/def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {//numPartitions = 您可以计划提供的并发数据库连接数datframe.repartition(numofpartitionsyouwant)val tableHeader: String = dataFrame.columns.mkString(",")dataFrame.foreachPartition { 分区 =>//注意:每个分区一个连接(更好的方法是使用连接池)val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)//批量大小为 1000,因为某些数据库不能使用大于 1000 的批量大小,例如:Azure sqlpartition.grouped(1000).foreach {组 =>val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()group.foreach {record => insertString.append("('" + record.mkString(",") + "'),")}sqlExecutorConnection.createStatement().executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES"+ insertString.stripSuffix(","))}sqlExecutorConnection.close()//关闭连接以便连接不会耗尽.}}

  • 示例 2:

foreachPartition 与 sparkstreaming (dstreams) 和 kafka 生产者的使用

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>//每个分区仅一次您可以安全地共享线程安全的 Kafka//producer 实例.val 生产者 = createKafkaProducer()partitionOfRecords.foreach { message =>生产者.发送(消息)}生产者.close()}}

<块引用>

注意:如果你想避免这种每个分区创建一次生产者的方式,更好的方法是使用sparkContext.broadcast 因为 Kafka 生产者是异步的并且发送前大量缓冲数据.

<小时><块引用>

可使用的累加器示例代码片段...通过它你可以测试性能

<前>测试(Foreach - Spark"){导入 spark.implicits._var accum = sc.longAccumulatorsc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))断言(累加值 == 6L)}测试(Foreach 分区 - Spark"){导入 spark.implicits._var accum = sc.longAccumulatorsc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))断言(累加值 == 6L)}

结论:

<块引用>

foreachPartition 对分区的操作很明显比 foreach

更好的边缘

经验法则:

<块引用>

foreachPartition 当您访问昂贵的资源,如数据库连接或 kafka 生产者等.. 将初始化每个分区一个,而不是每个元素一个(foreach).当它谈到蓄能器,您可以通过上述测试来衡量性能方法,在累加器的情况下也应该工作得更快..

另外...参见map vs mappartitions,其中有类似的概念,但它们是转换.

I would like to know if the foreachPartition will results in better performance, due to an higher level of parallelism, compared to the foreach method considering the case in which I'm flowing through an RDD in order to perform some sums into an accumulator variable.

解决方案

foreach and foreachPartitions are actions.

foreach(function): Unit

A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

example :

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(function): Unit

Similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).

Usage of foreachPartition examples:


  • Example1 : for each partition one database connection (Inside for each partition block) you want to use then this is an example usage of how it can be done using scala.

/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }

  • Example2 :

Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using sparkContext.broadcast since Kafka producer is asynchronous and buffers data heavily before sending.


Accumulator samples snippet to play around with it... through which you can test the performance

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

Conclusion :

foreachPartition operations on partitions so obviously it would be better edge than foreach

Rule of Thumb :

foreachPartition should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..

Also... see map vs mappartitions which has similar concept but they are tranformations.

这篇关于Apache Spark - foreach 与 foreachPartition 何时使用什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆