如何在pd.DataFrame.to_sql插入方法中获取PostgreSQL“ upsert”主键列 [英] How to get primary key columns in pd.DataFrame.to_sql insertion method for PostgreSQL "upsert"

查看:445
本文介绍了如何在pd.DataFrame.to_sql插入方法中获取PostgreSQL“ upsert”主键列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试修改 pandas插入方法。目的是为Postgres数据库实现更新机制。

I'm trying to modify pandas insertion method using COPY. The purpose is to implement an "upsert" mechanism for Postgres database.

我正在使用此 SO答案,用于创建临时表并将数据复制到该表中,然后将其插入目标表中。

I'm using this SO answer for creating temp table and copying data into it, then inserting into target table.

以下代码可以正常工作,但是我已经将primary_key显式设置为我的真实表PK。问题是,我可以从此范围变量中的可见值中获取PK吗?

The following code is working, but I had to set primary_key to my real table PK explicitly. The question is, can I get PK from visible in this scope variables?

import csv
from io import StringIO
from typing import Iterable

from sqlalchemy.engine.base import Connection
from pandas.io.sql import SQLTable


# Alternative to_sql() *method* for DBs that support COPY FROM
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
def psql_upsert_copy(table: SQLTable, conn: Connection, keys: Iterable, data_iter: Iterable[tuple]):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)
        columns = ', '.join(f'"{k}"' for k in keys)
        excluded_columns = ', '.join(f'EXCLUDED."{k}"' for k in keys)

        # is it possible to get it from the table?
        primary_key = ', '.join(['"PK_col_a"', '"PK_col_b"'])

        if table.schema:
            table_name = f'{table.schema}.{table.name}'
        else:
            table_name = table.name
        sql = f'''
        CREATE TEMP TABLE tmp_table
        ON COMMIT DROP
        AS SELECT * FROM {table_name}
        WITH NO DATA;

        COPY tmp_table ({columns}) FROM STDIN WITH CSV;

        INSERT INTO {table_name}
        SELECT *
        FROM tmp_table
        ON CONFLICT ({primary_key}) DO UPDATE
        SET ({columns}) = ({excluded_columns});
        '''
        cur.copy_expert(sql=sql, file=s_buf)

聚苯乙烯用法如下:

df.to_sql(name='orinal_table_name', con=some_psql_db_engine, if_exists='append', index=False, method=psql_upsert_copy)


推荐答案

您可以使用SQLalchemy执行以下操作:

You can do something like this with SQLalchemy:

from sqlalchemy import MetaData
from sqlalchemy.dialects.postgresql import insert
import psycopg2
import sqlalchemy


def upsert_data(df, url, schema, table, primarykey):
    insrt_vals = df.to_dict(orient='records')
    engine = sqlalchemy.create_engine(url)
    connect = engine.connect()
    meta = MetaData(bind=engine, schema=schema)
    meta.reflect(bind=engine)
    table_used = meta.tables[table]
    insrt_stmnt = insert(table_used).values(insrt_vals)

    update_columns = {col.name: col for col in insrt_stmnt.excluded if col.name not in (primarykey)}
    upsert_stmt = insrt_stmnt.on_conflict_do_update(index_elements=[primarykey], set_=update_columns)

有关此信息,请参见此处: https://docs.sqlalchemy.org/en/13/dialects/ postgresql.html

More info on this can be found here: https://docs.sqlalchemy.org/en/13/dialects/postgresql.html

这篇关于如何在pd.DataFrame.to_sql插入方法中获取PostgreSQL“ upsert”主键列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆