将批量数据从数据帧/CSV 插入或更新到 PostgreSQL 数据库 [英] INSERT or UPDATE bulk data from dataframe/CSV to PostgreSQL database
问题描述
要求:插入新数据和更新现有数据批量(行数> 1000)从数据框/CSV(以任何套件为准)并将其保存在 PostgreSQL 数据库中.
Requirement: Insert new data and update existing data in bulk (row count > 1000) from a dataframe/CSV (which ever suites) and save it in PostgreSQL database.
表格:TEST_TABLE
CREATE TABLE TEST_TABLE (
itemid varchar(100) NOT NULL PRIMARY KEY,
title varchar(255),
street varchar(10),
pincode VARCHAR(100));
INSERT: ['756252', 'tom title', 'APC Road', '598733' ],
['75623', 'dick title', 'Bush Road', '598787' ],
['756211', 'harry title', 'Obama Street', '598733' ]
数据框内容:
data = [['756252', 'tom new title', 'Unknown Road', 'pin changed' ],
['75623', 'dick new title', 'Bush Road changed', '598787 also changed' ],
['756211', 'harry title', 'Obama Street', '598733'],
['7562876', 'new1 data title', 'A Street', '598730'],
['7562345', 'new2 data title', 'B Street', '598731'],
['7562534', 'new3 data title', 'C Street', '598732'],
['7562089', 'new4 data title', 'D Street', '598733']]
df = pd.DataFrame(data, columns = ['itemid', 'title', 'street', 'pincode'])
我想用相同的 itemid 和 INSERT 新记录UPDATE 记录.数据会很大(从数据帧创建的 CSV 文件超过 50MB).
I want to UPDATE the records with same itemid and INSERT the new records. The data will be huge (CSV file created from the dataframe is more than 50MB).
使用的编程语言:Python
数据库:PostgreSQL
推荐答案
在这种特殊情况下,最好降到 DB-API 级别,因为您需要一些即使 SQLAlchemy Core 也不会直接公开的工具,例如copy_expert()
.这可以使用 raw_connection()
.如果您的源数据是 CSV 文件,则在这种情况下您根本不需要熊猫.首先创建一个临时临时表,将数据复制到临时表,并通过冲突处理插入到目标表:
In this particular case it is better to drop down to DB-API level, because you need some tools that are not exposed even by SQLAlchemy Core directly, such as copy_expert()
. That can be done using raw_connection()
. If your source data is a CSV file, you do not need pandas in this case at all. Start by creating a temporary staging table, copy data to the temp table, and insert to the destination table with conflict handling:
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
ON COMMIT DROP""")
with open("your_source.csv") as data:
cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
FROM STDIN WITH CSV""", data)
cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
SELECT itemid, title, street, pincode
FROM TEST_STAGING
ON CONFLICT ( itemid )
DO UPDATE SET title = EXCLUDED.title
, street = EXCLUDED.street
, pincode = EXCLUDED.pincode""")
except:
conn.rollback()
raise
else:
conn.commit()
finally:
conn.close()
另一方面,如果您的源数据是 DataFrame
,您仍然可以通过 将函数作为 method=
传递给 to_sql()
.该函数甚至可以隐藏上述所有逻辑:
If on the other hand your source data is the DataFrame
, you can still use COPY
by passing a function as method=
to to_sql()
. The function could even hide all the above logic:
import csv
from io import StringIO
from psycopg2 import sql
def psql_upsert_copy(table, conn, keys, data_iter):
dbapi_conn = conn.connection
buf = StringIO()
writer = csv.writer(buf)
writer.writerows(data_iter)
buf.seek(0)
if table.schema:
table_name = sql.SQL("{}.{}").format(
sql.Identifier(table.schema), sql.Identifier(table.name))
else:
table_name = sql.Identifier(table.name)
tmp_table_name = sql.Identifier(table.name + "_staging")
columns = sql.SQL(", ").join(map(sql.Identifier, keys))
with dbapi_conn.cursor() as cur:
# Create the staging table
stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
cur.execute(stmt)
# Populate the staging table
stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
stmt = sql.SQL(stmt).format(tmp_table_name, columns)
cur.copy_expert(stmt, buf)
# Upsert from the staging table to the destination. First find
# out what the primary key columns are.
stmt = """
SELECT kcu.column_name
FROM information_schema.table_constraints tco
JOIN information_schema.key_column_usage kcu
ON kcu.constraint_name = tco.constraint_name
AND kcu.constraint_schema = tco.constraint_schema
WHERE tco.constraint_type = 'PRIMARY KEY'
AND tco.table_name = %s
"""
args = (table.name,)
if table.schema:
stmt += "AND tco.table_schema = %s"
args += (table.schema,)
cur.execute(stmt, args)
pk_columns = {row[0] for row in cur.fetchall()}
# Separate "data" columns from (primary) key columns
data_columns = [k for k in keys if k not in pk_columns]
# Build conflict_target
pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))
set_ = sql.SQL(", ").join([
sql.SQL("{} = EXCLUDED.{}").format(k, k)
for k in map(sql.Identifier, data_columns)])
stmt = """
INSERT INTO {} ( {} )
SELECT {}
FROM {}
ON CONFLICT ( {} )
DO UPDATE SET {}
"""
stmt = sql.SQL(stmt).format(
table_name, columns, columns, tmp_table_name, pk_columns, set_)
cur.execute(stmt)
然后您将使用
df.to_sql("test_table", engine,
method=psql_upsert_copy,
index=False,
if_exists="append")
在这台带有本地数据库的机器上,使用这种方法插入约 1,000,000 行大约需要 16 秒.
Using this method upserting ~1,000,000 rows took about 16s on this machine with a local database.
这篇关于将批量数据从数据帧/CSV 插入或更新到 PostgreSQL 数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!