如何加快Spark df.write jdbc到postgres数据库? [英] How to speed up spark df.write jdbc to postgres database?

查看:144
本文介绍了如何加快Spark df.write jdbc到postgres数据库?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新手,正在尝试使用df.write将数据框的内容(可以有200k至2M行)追加到Postgres数据库中:

  df.write.format('jdbc').options(url = psql_url_spark,driver = spark_env ['PSQL_DRIVER'],dbtable ="{schema}.{table}".format(schema = schema,table = table),user = spark_env ['PSQL_USER'],password = spark_env ['PSQL_PASS'],batchsize = 2000000,queryTimeout = 690).mode(mode).save() 

我尝试增加批处理大小,但这没有帮助,因为完成此任务仍然需要大约4个小时.我还在下面包括了一些来自aws emr的快照,这些快照显示了有关该作业运行方式的更多详细信息.将数据框保存到postgres表的任务仅分配给一个执行器(我觉得很奇怪),要加快此速度,需要在执行器之间划分此任务?

我还阅读了

解决方案

Spark是分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行程序来执行任务.Spark JDBC速度很慢,因为当您建立JDBC连接时,其中一个执行程序会建立到目标数据库的链接,因此会导致速度降低和失败.

要克服此问题并加快数据写入数据库的速度,您需要使用以下方法之一:

方法1:

在这种方法中,您需要使用postgres COPY命令实用程序,以便加快写入操作.这要求您在EMR群集上具有 psycopg2 库.

COPY实用程序的文档位于此处

如果您想了解基准测试的差异以及为什么复制速度更快,请访问

现在使用python读取文件并为每个文件执行复制命令.

  import psycopg2#在此处遍历文件并生成文件对象,也可以使用os模块获取文件列表文件=打开('路径/到/保存/数据/part-00000_0.csv')file1 =打开('路径/到/保存/数据/part-00000_1.csv')#定义一个函数def execute_copy(文件名):con = psycopg2.connect(数据库= dbname,用户=用户,密码=密码,主机=主机,端口=端口)游标= con.cursor()cursor.copy_from(fileName,'table_name',sep =,")con.commit()con.close() 

要获得进一步的速度提升,由于您正在使用EMR群集,因此可以利用python多处理一次复制多个文件.

来自多处理导入池的

 ,cpu_count用Pool(cpu_count())作为p:打印(p.map(execute_copy,[file,file1])) 

这是推荐的方法,因为由于连接限制,无法调整spark JDBC以获得更高的写入速度.

方法2:由于您已经在使用AWS EMR集群,因此您始终可以利用hadoop功能来更快地执行表写入.因此,这里我们将使用sqoop导出将数据从emrfs导出到postgres db.

 #如果您使用s3作为源路径sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/--driver org.postgresql.Driver --username master --password password --input-null-string'\\ N'--input-null-non-string'\\ N'--direct -m 16#如果您使用EMRFS作为源路径sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir/path/to/save/data/--driver org.postgresql.Driver --username master --password password-输入空字符串'\\ N'-输入空非字符串'\\ N'--direct -m 16 

为什么要蹲下?因为sqoop根据指定的映射器数量打开与数据库的多个连接.因此,如果将-m指定为8,则将有8个并发连接流,这些连接流会将数据写入postgres.

此外,有关使用sqoop的更多信息,请通过以下 SQOOP文档.

如果您可以使用代码来解决问题,那么方法1绝对可以为您寻求的性能带来提升,如果您对像SQOOP这样的hadoop组件感到满意,则可以使用第二种方法.

希望有帮助!

I am new to spark and am attempting to speed up appending the contents of a dataframe, (that can have between 200k and 2M rows) to a postgres database using df.write:

df.write.format('jdbc').options(
      url=psql_url_spark,
      driver=spark_env['PSQL_DRIVER'],
      dbtable="{schema}.{table}".format(schema=schema, table=table),
      user=spark_env['PSQL_USER'],
      password=spark_env['PSQL_PASS'],
      batchsize=2000000,
      queryTimeout=690
      ).mode(mode).save()

I tried increasing the batchsize but that didn't help, as completing this task still took ~4hours. I've also included some snapshots below from aws emr showing more details about how the job ran. The task to save the dataframe to the postgres table was only assigned to one executor (which I found strange), would speeding this up involve dividing this task between executors?

Also, I have read spark's performance tuning docs but increasing the batchsize, and queryTimeout have not seemed to improve performance. (I tried calling df.cache() in my script before df.write, but runtime for the script was still 4hrs)

Additionally, my aws emr hardware setup and spark-submit are:

Master Node (1): m4.xlarge

Core Nodes (2): m5.xlarge

spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

解决方案

Spark is a distributed data processing engine, so when you are processing your data or saving it on file system it uses all its executors to perform the task. Spark JDBC is slow because when you establish a JDBC connection, one of the executors establishes link to the target database hence resulting in slow speeds and failure.

To overcome this problem and speed up data writes to the database you need to use one of the following approaches:

Approach 1:

In this approach you need to use postgres COPY command utility in order to speed up the write operation. This requires you to have psycopg2 library on your EMR cluster.

The documentation for COPY utility is here

If you want to know the benchmark differences and why copy is faster visit here!

Postgres also suggests using COPY command for bulk inserts. Now how to bulk insert a spark dataframe. Now to implement faster writes, first save your spark dataframe to EMR file system in csv format and also repartition your output so that no file contains more than 100k rows.

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

Now read the files using python and execute copy command for each file.

import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
    con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
    cursor = con.cursor()
    cursor.copy_from(fileName, 'table_name', sep=",")
    con.commit()
    con.close()

To gain additional speed boost, since you are using EMR cluster you can leverage python multiprocessing to copy more than one file at once.

from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
        print(p.map(execute_copy, [file,file1]))

This is the approach recommended as spark JDBC can't be tuned to gain higher write speeds due to connection constraints.

Approach 2: Since you are already using an AWS EMR cluster you can always leverage the hadoop capabilities to perform your table writes faster. So here we will be using sqoop export to export our data from emrfs to the postgres db.

#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

Why sqoop? Because sqoop opens multiple connections with the database based on the number of mapper specified. So if you specify -m as 8 then 8 concurrent connection streams will be there and those will write data to the postgres.

Also, for more information on using sqoop go through this AWS Blog, SQOOP Considerations and SQOOP Documentation.

If you can hack around your way with code then Approach 1 will definitely give you the performance boost you seek and if you are comfortable with hadoop components like SQOOP then go with second approach.

Hope it helps!

这篇关于如何加快Spark df.write jdbc到postgres数据库?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆