如何使用JDBC源在(Py)Spark中读写数据? [英] How to use JDBC source to write and read data in (Py)Spark?

查看:25
本文介绍了如何使用JDBC源在(Py)Spark中读写数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题的目标是记录:

The goal of this question is to document:

  • 在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤

  • steps required to read and write data using JDBC connections in PySpark

JDBC 源可能存在的问题并知道解决方案

possible issues with JDBC sources and know solutions

只要稍作改动,这些方法就应该适用于其他支持的语言,包括 Scala 和 R.

With small changes these methods should work with other supported languages including Scala and R.

推荐答案

写入数据

  1. 在提交应用程序或启动 shell 时包括适用的 JDBC 驱动程序.例如,您可以使用 --packages:

 bin/pyspark --packages group:name:version  

或结合 driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

这些属性也可以在 JVM 实例启动之前使用 PYSPARK_SUBMIT_ARGS 环境变量设置,或者使用 conf/spark-defaults.conf 设置 spark.jars.packagesspark.jars/spark.driver.extraClassPath.

These properties can be also set using PYSPARK_SUBMIT_ARGS environment variable before JVM instance has been started or using conf/spark-defaults.conf to set spark.jars.packages or spark.jars / spark.driver.extraClassPath.

  1. 选择所需的模式.Spark JDBC writer 支持以下模式:

  1. Choose desired mode. Spark JDBC writer supports following modes:

  • append:将此 :class:DataFrame 的内容附加到现有数据中.
  • overwrite:覆盖现有数据.
  • ignore:如果数据已经存在,则静默忽略此操作.
  • error(默认情况):如果数据已经存在则抛出异常.
  • append: Append contents of this :class:DataFrame to existing data.
  • overwrite: Overwrite existing data.
  • ignore: Silently ignore this operation if data already exists.
  • error (default case): Throw an exception if data already exists.

不支持更新插入或其他细粒度的修改

Upserts or other fine-grained modifications are not supported

 mode = ...

  • 准备 JDBC URI,例如:

  • Prepare JDBC URI, for example:

     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
    
     url = "jdbc:postgresql://localhost/foobar"
    

  • (可选)创建 JDBC 参数字典.

  • (Optional) Create a dictionary of JDBC arguments.

     properties = {
         "user": "foo",
         "password": "bar"
     }
    

    properties/options 也可用于设置 支持的 JDBC 连接属性.

    使用DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

  • 保存数据(见pyspark.sql.DataFrameWriter 了解详情.

    to save the data (see pyspark.sql.DataFrameWriter for details).

    已知问题:

    • 当使用 --packages 包含驱动程序时,找不到合适的驱动程序(java.sql.SQLException:找不到适合 jdbc 的驱动程序:...)

    • Suitable driver cannot be found when driver has been included using --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    假设没有驱动程序版本不匹配来解决这个问题,您可以将 driver 类添加到 properties.例如:

    Assuming there is no driver version mismatch to solve this you can add driver class to the properties. For example:

      properties = {
          ...
          "driver": "org.postgresql.Driver"
      }
    

  • 使用 df.write.format("jdbc").options(...).save() 可能会导致:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择.

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    解决方案未知.

    在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:

    in Pyspark 1.3 you can try calling Java method directly:

      df._jdf.insertIntoJDBC(url, "baz", True)
    

    1. 按照写入数据

    使用sqlContext.read.jdbc:

     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    sqlContext.read.format("jdbc"):

        (sqlContext.read.format("jdbc")
            .options(url=url, dbtable="baz", **properties)
            .load())
    

    已知问题和陷阱:

    • 找不到合适的驱动程序 - 请参阅:写入数据

    • Suitable driver cannot be found - see: Writing data

    Spark SQL 支持对 JDBC 源进行谓词下推,尽管并非所有谓词都可以下推.它也不委托限制或聚合.可能的解决方法是用有效的子查询替换 dbtable/table 参数.参见示例:

    Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace dbtable / table argument with a valid subquery. See for example:

    默认情况下,JDBC 数据源使用单个执行程序线程按顺序加载数据.为确保分布式数据加载,您可以:

    By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:

    • 提供分区column(必须是IntegerType)、lowerBoundupperBoundnumPartitions.
    • 提供互斥谓词谓词的列表,每个所需的分区一个.
    • Provide partitioning column (must be IntegerType), lowerBound, upperBound, numPartitions.
    • Provide a list of mutually exclusive predicates predicates, one for each desired partition.

    见:

    在分布式模式下(带有分区列或谓词),每个执行器都在自己的事务中运行.如果同时修改源数据库,则无法保证最终视图一致.

    In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.

    • Maven Repository(要获取--packages所需的坐标,选择所需的从 Gradle 选项卡以 compile-group:name:version 替换相应字段的形式复制和复制数据)或 Maven 中央存储库:

    • Maven Repository (to obtain required coordinates for --packages select desired version and copy data from a Gradle tab in a form compile-group:name:version substituting respective fields) or Maven Central Repository:

    根据数据库可能存在专用源,并且在某些情况下是首选:

    Depending on the database specialized source might exist, and be preferred in some cases:

    • Greenplum - Pivotal Greenplum-Spark Connector
    • Apache Phoenix - Apache Spark Plugin
    • Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
    • Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).

    这篇关于如何使用JDBC源在(Py)Spark中读写数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆