如何使用JDBC源写入和读取(PY)火花数据? [英] How to use JDBC source to write and read data in (Py)Spark?
问题描述
这个问题的目标是文档:
-
读写使用PySpark JDBC连接数据所需的步骤
-
可能的JDBC源的问题和解决方案的了解
使用小的变化,这些方法应与其他支持的语言包括Scala和R.工作
将数据写入
-
包括适用的JDBC驱动程序,当你递交申请或启动外壳。您可以使用例如
- 包
:斌/ pyspark --packages组:名称:版本
或组合
驱动程序类路径
和罐子
斌/ pyspark --driver类路径$ PATH_TO_DRIVER_JAR --jars $ PATH_TO_DRIVER_JAR
-
选择需要的模式。星火JDBC作家支持以下模式:
-
追加
:本追加内容:类:数据帧
现有数据 -
覆盖
:覆盖现有数据 -
忽略
:如果数据已经存在,忽略该操作 。 -
错误
(默认情况下):抛出一个异常,如果数据已经存在
块引用>Upserts或其他细粒度的修改,不支持
模式= ...
-
-
prepare JDBC URI,例如:
#你可以连接code的URI或凭据传递
#分别使用属性参数
JDBC方法或选项#URL =的jdbc:在PostgreSQL://本地主机/ foobar的 -
(可选)创建的JDBC参数的字典。
属性= {
用户:富,
密码:栏
} -
使用
DataFrame.write.jdbc
df.write.jdbc(URL =网址,表=巴兹模式=模式,属性=属性)
保存数据(见<一href=\"https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameWriter\"><$c$c>pyspark.sql.DataFrameWriter$c$c>有关详细信息)。
已知问题
- 当司机一直用
包含
(合适的驱动程序无法找到 - 包
值java.sql.SQLException:没有合适的驱动程序找到对于JDBC ...
)假设没有驱动程序版本不匹配解决这个你可以添加
驱动
类的<code>属性。例如:属性= {
...
司机:org.postgresql.Driver
} -
使用
df.write.format(JDBC)选项(...)保存()
可能会导致:
了java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许创建表作为选择
块引用>解决方案未知。
-
在Pyspark 1.3你可以尝试调用Java方法直接:
df._jdf.insertIntoJDBC(URL,巴兹,真)
读取数据
- 按照步骤1-4,从的将数据写入的
-
使用
sqlContext.read.jdbc
:sqlContext.read.jdbc(URL =网址,表=巴兹,属性=属性)
或
sqlContext.read.format(JDBC)
:(sqlContext.read.format(JDBC)
可供选项(URL =网址,DBTABLE =巴兹**属性)
。加载())
已知问题和陷阱
- 合适的驱动程序无法找到 - 见:写数据
- 星火SQL支持使用JDBC来源predicate下推虽然不是所有的predicates可以下推。它也不会授权限制,也不聚合。可能的解决方法是有效的子查询来替换
DBTABLE
/表
参数。例如,见:
在哪里可以找到合适的驱动程序:
-
Maven仓库(以获得
所需的坐标 - 包
选择从摇篮标签所需的版本和复制数据):
The goal of this question is to document:
steps required to read and write data using JDBC connections in PySpark
possible issues with JDBC sources and know solutions
With small changes these methods should work with other supported languages including Scala and R.
Writing data
Include applicable JDBC driver when you submit the application or start shell. You can use for example
--packages
:bin/pyspark --packages group:name:version
or combining
driver-class-path
andjars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
Choose desired mode. Spark JDBC writer supports following modes:
append
: Append contents of this :class:DataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.
Upserts or other fine-grained modifications are not supported
mode = ...
Prepare JDBC URI, for example:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(Optional) Create a dictionary of JDBC arguments.
properties = { "user": "foo", "password": "bar" }
Use
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
to save the data (see
pyspark.sql.DataFrameWriter
for details).
Known issues:
Suitable driver cannot be found when driver has been included using
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Assuming there is no driver version mismatch to solve this you can add
driver
class to theproperties
. For example:properties = { ... "driver": "org.postgresql.Driver" }
using
df.write.format("jdbc").options(...).save()
may result in:java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
Solution unknown.
in Pyspark 1.3 you can try calling Java method directly:
df._jdf.insertIntoJDBC(url, "baz", True)
Reading data
- Follow steps 1-4 from Writing data
Use
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
or
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
Known issues and gotchas:
- Suitable driver cannot be found - see: Writing data
- Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace
dbtable
/table
argument with a valid subquery. See for example:
Where to find suitable drivers:
Maven Repository (to obtain required coordinates for
--packages
select desired version and copy data from a Gradle tab):
这篇关于如何使用JDBC源写入和读取(PY)火花数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!