Spark:每个Spark RDD分区的数据库连接并执行mapPartition [英] Spark : DB connection per Spark RDD partition and do mapPartition
问题描述
我想在我的火花rdd上做一个mapPartitions
I want to do a mapPartitions on my spark rdd,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
但是,这给了我一个连接已经关闭的异常,这是预期的,因为在控件到达.map()
之前,我的connection
已关闭.我想为每个RDD分区创建一个连接,并正确关闭它.我该如何实现?
But, this gives me a connection already closed exception, as expected because before the control reaches the .map()
my connection
is closed. I want to create a connection per RDD partition, and close it properly. How can I achieve this?
谢谢!
推荐答案
如讨论中此处所述,问题源于迭代器partition
上map操作的惰性.这种懒惰意味着对于每个分区,都会创建并关闭一个连接,并且仅在稍后(作用RDD时)才调用readMatchingFromDB
.
As mentioned in the discussion here - the issue stems from the laziness of map operation on the iterator partition
. This laziness means that for each partition, a connection is created and closed, and only later (when RDD is acted upon), readMatchingFromDB
is called.
要解决此问题,您应该在关闭连接之前强制遍历迭代器,例如通过将其转换为列表(然后返回):
To resolve this, you should force an eager traversal of the iterator before closing the connection, e.g. by converting it into a list (and then back):
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close()
newPartition.iterator // create a new iterator
})
这篇关于Spark:每个Spark RDD分区的数据库连接并执行mapPartition的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!