Spark:每个Spark RDD分区的数据库连接并执行mapPartition [英] Spark : DB connection per Spark RDD partition and do mapPartition

查看:447
本文介绍了Spark:每个Spark RDD分区的数据库连接并执行mapPartition的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在我的火花rdd上做一个mapPartitions

I want to do a mapPartitions on my spark rdd,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })

但是,这给了我一个连接已经关闭的异常,这是预期的,因为在控件到达.map()之前,我的connection已关闭.我想为每个RDD分区创建一个连接,并正确关闭它.我该如何实现?

But, this gives me a connection already closed exception, as expected because before the control reaches the .map() my connection is closed. I want to create a connection per RDD partition, and close it properly. How can I achieve this?

谢谢!

推荐答案

如讨论中此处所述,问题源于迭代器partition上map操作的惰性.这种懒惰意味着对于每个分区,都会创建并关闭一个连接,并且仅在稍后(作用RDD时)才调用readMatchingFromDB.

As mentioned in the discussion here - the issue stems from the laziness of map operation on the iterator partition. This laziness means that for each partition, a connection is created and closed, and only later (when RDD is acted upon), readMatchingFromDB is called.

要解决此问题,您应该在关闭连接之前强制遍历迭代器,例如通过将其转换为列表(然后返回):

To resolve this, you should force an eager traversal of the iterator before closing the connection, e.g. by converting it into a list (and then back):

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close()
  newPartition.iterator // create a new iterator
})

这篇关于Spark:每个Spark RDD分区的数据库连接并执行mapPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆