在map/mapPartitions上下文中初始化数据库连接的SPARK成本 [英] SPARK Cost of Initializing Database Connection in map / mapPartitions context

查看:53
本文介绍了在map/mapPartitions上下文中初始化数据库连接的SPARK成本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在与mapPartitions和map有关的各种论坛上可以找到以下内容:

The following can be found on various forums in relation to mapPartitions and map:

... Consider the case of Initializing a database. If we are using map() or 
foreach(), the number of times we would need to initialize will be equal to 
the no of elements in RDD. Whereas if we use mapPartitions(), the no of times 
we would need to initialize would be equal to number of Partitions ...

然后有此响应:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
     })
    connection.close()
    newPartition
  })

所以,我的问题是在阅读了与此相关的各个项目的讨论之后

So, my questions are after having read discussions on various items pertaining to this:

  1. 虽然我通常可以了解使用mapPartitions的性能改进,但是为什么要根据文本的第一个片段,每次使用map来为RDD的每个元素调用数据库连接呢?我似乎找不到正确的理由.
  2. sc.textFile ...和从jdbc连接读取到数据帧中不会发生相同的事情.还是呢?如果是这样,我会感到非常惊讶.

我想念的是什么...?

What am I missing...?

推荐答案

首先,此代码不正确.它看起来像是

First of all this code is not correct. While it looks like an adaptation of the established pattern for foreachPartition it cannot be used with mapPartitions like this.

请记住,foreachPartition接受Iterator[_]并返回Iterator[_],其中Iterator.map是惰性的,因此此代码在实际使用之前关闭了连接.

Remember that foreachPartition takes Iterator[_] and returns Iterator[_], where Iterator.map is lazy, so this code is closing connection before it is actually used.

要使用在mapPartitions中初始化的某种形式的资源,您将不得不以不需要显式资源释放的方式来设计代码.

To use some form of resource, which is initialized in mapPartitions, you'll have to use design your code in a way, that doesn't require explicit resource release.

第一个文本片段,每次使用地图为RDD的每个元素调用数据库连接?我似乎找不到正确的理由.

the first snippet of text, the database connection be called every time for each element of an RDD using map? I can't seem to find the right reason.

没有相关代码段,答案必须是通用的-mapforeach不能处理外部状态.在您的问题中显示了API的情况下,您必须:

Without the snippet in question the answer must be generic - map or foreach are not designed to handle external state. With the API shown your in your question you'd have to:

rdd.map(record => readMatchingFromDB(record, new DbConnection))

以明显的方式为每个元素创建连接.

which in and obvious way creates connection for each element.

例如,使用单例连接池来执行类似的操作并非不可能:

It is not impossible to use for example singleton connection pool, doing something similar to:

object Pool {
  lazy val pool = ???
}

rdd.map(record => readMatchingFromDB(record, pool.getConnection))

但是正确地做并不总是那么容易(考虑线程安全性).而且由于连接和类似对象通常无法序列化,因此我们不能仅使用闭包.

but it is not always easy to to do it right (think about thread safety). And because connections and similar objects, cannot be in general serialized, we cannot just used closures.

相反,foreachPartition模式既显式又简单.

In contrast foreachPartition pattern is both explicit and simple.

如果当然可以强迫渴望执行以使事情正常进行,例如:

If is of course possible to force eager execution to make things work, for example:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
    }).toList
    connection.close()
    newPartition.toIterator
  })

但这当然是有风险的,实际上会降低性能.

but it is of course risky, can actually decrease performance.

sc.textFile ...和从jdbc连接读取数据帧中不会发生相同的事情.还是呢?

The same things does not happen with sc.textFile ... and reading into dataframes from jdbc connections. Or does it?

两者都使用低得多的API进行操作,但是当然不会为每条记录初始化资源.

Both operate using much lower API, but of course resources are not initialized for each record.

这篇关于在map/mapPartitions上下文中初始化数据库连接的SPARK成本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆