如何使用JDBC源在(Py)Spark中读写数据? [英] How to use JDBC source to write and read data in (Py)Spark?
问题描述
这个问题的目标是记录:
The goal of this question is to document:
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
steps required to read and write data using JDBC connections in PySpark
JDBC 源可能存在的问题并知道解决方案
possible issues with JDBC sources and know solutions
只要稍作改动,这些方法就应该适用于其他支持的语言,包括 Scala 和 R.
With small changes these methods should work with other supported languages including Scala and R.
推荐答案
写入数据
在提交应用程序或启动 shell 时包括适用的 JDBC 驱动程序.例如,您可以使用
--packages
:
bin/pyspark --packages group:name:version
或结合 driver-class-path
和 jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以在 JVM 实例启动之前使用 PYSPARK_SUBMIT_ARGS
环境变量设置,或者使用 conf/spark-defaults.conf
设置 spark.jars.packages
或 spark.jars
/spark.driver.extraClassPath
.
These properties can be also set using PYSPARK_SUBMIT_ARGS
environment variable before JVM instance has been started or using conf/spark-defaults.conf
to set spark.jars.packages
or spark.jars
/ spark.driver.extraClassPath
.
选择所需的模式.Spark JDBC writer 支持以下模式:
Choose desired mode. Spark JDBC writer supports following modes:
append
:将此 :class:DataFrame
的内容附加到现有数据中.overwrite
:覆盖现有数据.ignore
:如果数据已经存在,则静默忽略此操作.error
(默认情况):如果数据已经存在则抛出异常.
append
: Append contents of this :class:DataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.
Upserts or other fine-grained modifications are not supported
mode = ...
准备 JDBC URI,例如:
Prepare JDBC URI, for example:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(可选)创建 JDBC 参数字典.
(Optional) Create a dictionary of JDBC arguments.
properties = {
"user": "foo",
"password": "bar"
}
properties
/options
也可用于设置 支持的 JDBC 连接属性.
使用DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(见pyspark.sql.DataFrameWriter
了解详情.
to save the data (see pyspark.sql.DataFrameWriter
for details).
已知问题:
当使用
--packages
包含驱动程序时,找不到合适的驱动程序(java.sql.SQLException:找不到适合 jdbc 的驱动程序:...
)
Suitable driver cannot be found when driver has been included using
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)
假设没有驱动程序版本不匹配来解决这个问题,您可以将 driver
类添加到 properties
.例如:
Assuming there is no driver version mismatch to solve this you can add driver
class to the properties
. For example:
properties = {
...
"driver": "org.postgresql.Driver"
}
使用 df.write.format("jdbc").options(...).save()
可能会导致:
java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择.
java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
解决方案未知.
在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:
in Pyspark 1.3 you can try calling Java method directly:
df._jdf.insertIntoJDBC(url, "baz", True)
按照写入数据
使用sqlContext.read.jdbc
:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或 sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和陷阱:
找不到合适的驱动程序 - 请参阅:写入数据
Suitable driver cannot be found - see: Writing data
Spark SQL 支持对 JDBC 源进行谓词下推,尽管并非所有谓词都可以下推.它也不委托限制或聚合.可能的解决方法是用有效的子查询替换 dbtable
/table
参数.参见示例:
Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace dbtable
/ table
argument with a valid subquery. See for example:
默认情况下,JDBC 数据源使用单个执行程序线程按顺序加载数据.为确保分布式数据加载,您可以:
By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:
- 提供分区
column
(必须是IntegerType
)、lowerBound
、upperBound
、numPartitions代码>.
- 提供互斥谓词
谓词
的列表,每个所需的分区一个.
- Provide partitioning
column
(must beIntegerType
),lowerBound
,upperBound
,numPartitions
. - Provide a list of mutually exclusive predicates
predicates
, one for each desired partition.
见:
- 在通过 JDBC 从 RDBMS 读取数据时在 spark 中进行分区,
- 从 JDBC 源迁移数据时如何优化分区?,
- 如何使用 DataFrame 和 JDBC 连接提高慢速 Spark 作业的性能?
- 使用JDBC导入Postgres时如何对Spark RDD进行分区?
在分布式模式下(带有分区列或谓词),每个执行器都在自己的事务中运行.如果同时修改源数据库,则无法保证最终视图一致.
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
Maven Repository(要获取
--packages
所需的坐标,选择所需的从 Gradle 选项卡以compile-group:name:version
替换相应字段的形式复制和复制数据)或 Maven 中央存储库:
Maven Repository (to obtain required coordinates for
--packages
select desired version and copy data from a Gradle tab in a formcompile-group:name:version
substituting respective fields) or Maven Central Repository:
根据数据库可能存在专用源,并且在某些情况下是首选:
Depending on the database specialized source might exist, and be preferred in some cases:
- Greenplum - Pivotal Greenplum-Spark 连接器
- Apache Phoenix - Apache Spark 插件
- Microsoft SQL Server - 适用于 Azure SQL 数据库和 SQL Server 的 Spark 连接器
- Amazon Redshift - Databricks Redshift 连接器(当前版本仅在专有Databricks 运行时.停产的开源版本,可在 GitHub 上获得).
- Greenplum - Pivotal Greenplum-Spark Connector
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
- Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
这篇关于如何使用JDBC源在(Py)Spark中读写数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!