如何加速 spark df.write jdbc 到 postgres 数据库? [英] How to speed up spark df.write jdbc to postgres database?

查看:70
本文介绍了如何加速 spark df.write jdbc 到 postgres 数据库?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 spark 新手,正在尝试使用 df.write 加快将数据帧的内容(可能有 20 万到 200 万行)附加到 postgres 数据库的速度:

df.write.format('jdbc').options(url=psql_url_spark,driver=spark_env['PSQL_DRIVER'],dbtable="{schema}.{table}".format(schema=schema, table=table),user=spark_env['PSQL_USER'],密码=spark_env['PSQL_PASS'],批量大小=2000000,查询超时=690).mode(mode).save()

我尝试增加批量大小,但这没有帮助,因为完成这项任务仍然需要大约 4 小时.我还在下面包含了来自 aws emr 的一些快照,显示了有关作业运行方式的更多详细信息.将数据帧保存到 postgres 表的任务只分配给了一个执行程序(我觉得很奇怪),加快速度是否涉及在执行程序之间分配此任务?

另外,我已经阅读了

解决方案

Spark 是分布式数据处理引擎,因此当您处理数据或将其保存在文件系统中时,它会使用其所有执行程序来执行任务.Spark JDBC 很慢,因为当您建立 JDBC 连接时,其中一个执行程序会建立到目标数据库的链接,从而导致速度缓慢和失败.

要克服此问题并加快向数据库写入数据的速度,您需要使用以下方法之一:

方法 1:

在这种方法中,您需要使用 postgres COPY 命令实用程序 以加快写入操作.这要求您在 EMR 集群上拥有 psycopg2 库.

COPY 实用程序的文档在这里

如果您想了解基准差异以及复制速度更快的原因,请访问 这里

Postgres 还建议使用 COPY 命令进行批量插入.现在如何批量插入火花数据帧.现在要实现更快的写入,首先将您的 spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区您的输出,以便没有文件包含超过 10 万行.

#根据 df 中的行数动态重新分区您的数据框df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

现在使用python读取文件并对每个文件执行复制命令.

导入psycopg2#it​​erate over你的文件并生成文件对象你也可以使用os模块获取文件列表file = open('path/to/save/data/part-00000_0.csv')file1 = open('path/to/save/data/part-00000_1.csv')#定义一个函数def execute_copy(fileName):con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)游标 = con.cursor()cursor.copy_from(fileName, 'table_name', sep=",")提交()关闭()

为了获得额外的速度提升,由于您使用的是 EMR 集群,您可以利用 python 多处理一次复制多个文件.

from multiprocessing import Pool, cpu_count使用 Pool(cpu_count()) 作为 p:打印(p.map(execute_copy,[文件,文件1]))

这是推荐的方法,因为由于连接限制,无法调整 spark JDBC 以获得更高的写入速度.

方法 2:由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能更快地执行表写入.所以在这里我们将使用 sqoop export 将我们的数据从 emrfs 导出到 postgres 数据库.

#如果您使用 s3 作为源路径sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/--driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16#如果您使用 EMRFS 作为源路径sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir/path/to/save/data/--driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

为什么是sqoop?因为sqoop根据指定的mapper数量打开与数据库的多个连接.因此,如果您将 -m 指定为 8,那么将有 8 个并发连接流,它们会将数据写入 postgres.

此外,有关使用 sqoop 的更多信息,请访问此 AWS 博客SQOOP 注意事项SQOOP 文档.

如果您可以使用代码进行破解,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 SQOOP 等 hadoop 组件感到满意,那么请选择第二种方法.

希望有帮助!

I am new to spark and am attempting to speed up appending the contents of a dataframe, (that can have between 200k and 2M rows) to a postgres database using df.write:

df.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

I tried increasing the batchsize but that didn't help, as completing this task still took ~4hours. I've also included some snapshots below from aws emr showing more details about how the job ran. The task to save the dataframe to the postgres table was only assigned to one executor (which I found strange), would speeding this up involve dividing this task between executors?

Also, I have read spark's performance tuning docs but increasing the batchsize, and queryTimeout have not seemed to improve performance. (I tried calling df.cache() in my script before df.write, but runtime for the script was still 4hrs)

Additionally, my aws emr hardware setup and spark-submit are:

Master Node (1): m4.xlarge

Core Nodes (2): m5.xlarge

spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

解决方案

Spark is a distributed data processing engine, so when you are processing your data or saving it on file system it uses all its executors to perform the task. Spark JDBC is slow because when you establish a JDBC connection, one of the executors establishes link to the target database hence resulting in slow speeds and failure.

To overcome this problem and speed up data writes to the database you need to use one of the following approaches:

Approach 1:

In this approach you need to use postgres COPY command utility in order to speed up the write operation. This requires you to have psycopg2 library on your EMR cluster.

The documentation for COPY utility is here

If you want to know the benchmark differences and why copy is faster visit here!

Postgres also suggests using COPY command for bulk inserts. Now how to bulk insert a spark dataframe. Now to implement faster writes, first save your spark dataframe to EMR file system in csv format and also repartition your output so that no file contains more than 100k rows.

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

Now read the files using python and execute copy command for each file.

import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
    con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
    cursor = con.cursor()
    cursor.copy_from(fileName, 'table_name', sep=",")
    con.commit()
    con.close()

To gain additional speed boost, since you are using EMR cluster you can leverage python multiprocessing to copy more than one file at once.

from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
        print(p.map(execute_copy, [file,file1]))

This is the approach recommended as spark JDBC can't be tuned to gain higher write speeds due to connection constraints.

Approach 2: Since you are already using an AWS EMR cluster you can always leverage the hadoop capabilities to perform your table writes faster. So here we will be using sqoop export to export our data from emrfs to the postgres db.

#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

Why sqoop? Because sqoop opens multiple connections with the database based on the number of mapper specified. So if you specify -m as 8 then 8 concurrent connection streams will be there and those will write data to the postgres.

Also, for more information on using sqoop go through this AWS Blog, SQOOP Considerations and SQOOP Documentation.

If you can hack around your way with code then Approach 1 will definitely give you the performance boost you seek and if you are comfortable with hadoop components like SQOOP then go with second approach.

Hope it helps!

这篇关于如何加速 spark df.write jdbc 到 postgres 数据库?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆