Pyspark - 重用 JDBC 连接 [英] Pyspark - reusing JDBC connection

查看:38
本文介绍了Pyspark - 重用 JDBC 连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下任务:

  • 从多个模式中的一张表加载数据
  • 使用 PySpark
  • 使用一个可以访问数据库中所有模式的用户

我正在使用以下代码(或多或少):

I am using the following code (more or less):

def connect_to_oracle_db(spark_session, db_query):
    return spark_session.read \
        .format("jdbc") \
        .option("url", "jdbc:oracle:thin:@//<host>:<port>/<srvice_name") \
        .option("user", "<user>") \
        .option("password", "<pass>") \
        .option("dbtable", db_query) \
        .option("driver", "oracle.jdbc.driver.OracleDriver") 

def run(self):
    all_schemes  = <list of all available schemes>
    for str_schema in all_schemes:
        db_query = "(Select * from " + str_schema + ".TABLE1) TABLE1_DATA"
        df_table1 = slef.connect_to_oracle_db(db_query).load()
        # process df_table1

大约有 300 个方案,而且速度很慢,因为每次迭代都会创建并关闭新连接.我想找到一种如何重用现有连接或以某种方式创建连接池的方法.对我来说它看起来很无效.

There are around 300 schemes and and it is quite slow because each for iteration the new connection is created and close. I want to find a way how to reuse the existing connection or somehow create the connection pool. It looks quite ineffective for me.

您知道如何为 PySpark 重用连接或创建连接池吗?

Do you have any idea how to reuse the connection or create connection pool for PySpark?

推荐答案

在像 Spark 这样的分布式系统中,没有传统意义上的连接池的位置.您必须记住,每个分区可以由不同的物理节点、不同的逻辑容器(如果适用于给定的集群管理器)以及最终不同的 JVM 处理.

There is no place for a connection pool in a classical sense in a distributed like Spark. You have to remember that each partition can be processed by different physical node, different logical container (if applicable on a given cluster manager) and finally different JVMs.

在这种情况下,连接池并没有真正的帮助.由于 Spark 旨在用于大量导入,因此单个连接的利用率已经相当高.

Not that connection pool could really help in such case. Since Spark is intended for massive imports, individual connection utilization is already pretty high.

然而,这里有不同的可见问题(可能还有其他问题,从代码片段中并不明显,因为您显示的代码实际上并未获取数据):

There are however different visible problems here (and possibly other problems, not obvious from the snippet, as the code you've shown doesn't actually fetch the data):

  • 您没有配置 fetchsize,因此将使用特定驱动程序的默认值.对于 Oracle 是 10,完全不适合大规模处理

  • You didn't configure fetchsize, so the default for the particular driver will be used. For Oracle it is 10, which is completely unfit for large scale processing

return spark_session.read 
  .format("jdbc")
  .option("fetchsize", some_reasonable_value)
  ...

  • 您没有配置分区,因此 Spark 将仅使用一个分区处理所有数据.您可以在 从 JDBC 源迁移数据时如何优化分区?

    您已将此过程建模为一个顺序过程.除非数据集以某种方式在下游合并,否则最好为每个表提交一个单独的作业,并让 scheduler 根据可用资源优化事物.

    You've modeled this a sequential process. Unless dataset are somehow combined downstream it would be best, to submit a separate job for each table, and let scheduler optimize things, according to available resources.

    您还可以考虑在单个应用程序中并行处理表

    You can also consider processing tables in parallel in a single application

    重申一下 - Spark 是懒惰的,所以核心问题可能在其他地方,而上面列出的问题可能是次要的.

    And just to reiterate - Spark is lazy, so the core problem can be in some other place, and the issues listed above, can be secondary.

    这篇关于Pyspark - 重用 JDBC 连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆