使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server [英] Efficient upsert of pandas dataframe to MS SQL Server using pyodbc

查看:121
本文介绍了使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 pyodbc 将 Pandas 数据帧更新插入到 MS SQL Server.我以前使用过类似的方法来进行直线插入,但这次我尝试的解决方案非常慢.有没有比我所拥有的更简化的方法来完成 upsert?

I'm trying to upsert a pandas dataframe to a MS SQL Server using pyodbc. I've used a similar approach before to do straight inserts, but the solution I've tried this time is incredibly slow. Is there a more streamlined way to accomplish an upsert than what I have?

sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()

for index, row in bdf.iterrows():
    res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?", 
                    row['SITE'], 
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ORDER_QTY'],
                    row['BPS_INCLUDE'],
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['ITEM'],
                    row['END_DT'])

    if res.rowcount == 0:
            cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
                    row['SITE'], 
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ITEM'],
                    row['ORDER_QTY'],
                    row['END_DT'],
                    row['BPS_INCLUDE'])

    sql_connect.commit()

cursor.close()
sql_connect.close()

我使用原始 ~50k 行数据帧的五行样本尝试了上述方法,并且效果很好.所以逻辑似乎没问题.只是速度是个问题.

I tried the above with a five row sample of my original ~50k row dataframe and it worked fine. So the logic seems okay. It's just the speed that is an issue.

推荐答案

对该问题的评论建议将 DataFrame 上传到临时表,然后将内容合并到主表中.但是请注意,T-SQL MERGE 语句的文档 说:

Comments to the question suggest uploading the DataFrame to a temporary table and then merging the contents into the main table. Note, however, that the documentation for the T-SQL MERGE statement says:

性能提示:当两个表具有复杂的匹配特征混合时,为 MERGE 语句描述的条件行为最有效.例如,如果一行不存在则插入,或者如果匹配则更新该行.当简单地根据另一个表的行更新一个表时,可以使用基本的 INSERT、UPDATE 和 DELETE 语句来提高性能和可伸缩性.

Performance Tip: The conditional behavior described for the MERGE statement works best when the two tables have a complex mixture of matching characteristics. For example, inserting a row if it does not exist, or updating the row if it does match. When simply updating one table based on the rows of another table, improved performance and scalability can be achieved with basic INSERT, UPDATE, and DELETE statements.

在您的情况下,匹配条件相对简单-实际上是多列主键-因此您可以简单地使用带有 UPDATE 语句和 INSERT 语句的匿名代码块,如下面的简化 MCVE 代码.

In your case the matching criteria are relatively straightforward - just what is effectively a multi-column primary key - so you could simply use an anonymous code block with an UPDATE statement and an INSERT statement as in the following simplified MCVE code.

最低要求:

  • Python 3.6+ 用于 f'...' 字符串格式
  • SQLAlchemy 1.3 用于 fast_executemany 参数到 create_engine
  • DRIVER=ODBC Driver 17 for SQL Server;UseFMTONLY=Yes; 用于可靠的 fast_executemany INSERTs 到 SQL Server #temporary 表
  • Python 3.6+ for f'...' string formatting
  • SQLAlchemy 1.3 for the fast_executemany argument to create_engine
  • DRIVER=ODBC Driver 17 for SQL Server; and UseFMTONLY=Yes; for reliable fast_executemany INSERTs to a SQL Server #temporary table
from pprint import pprint
import sys
import urllib

import pandas as pd
import pyodbc
import sqlalchemy as sa

print(sys.version)
# 3.7.5 (tags/v3.7.5:5c02a39a0b, Oct 15 2019, 00:11:34) [MSC v.1916 64 bit (AMD64)]
print(
    f"SQLAlchemy {sa.__version__}, pandas {pd.__version__}, pyodbc {pyodbc.version}"
)
# SQLAlchemy 1.3.19, pandas 1.1.2, pyodbc 4.0.30

connection_string = (
    r"DRIVER=ODBC Driver 17 for SQL Server;"
    r"SERVER=(local)\SQLEXPRESS;"
    r"DATABASE=myDb;"
    r"Trusted_Connection=Yes;"
    r"UseFMTONLY=Yes;"
)
sqlalchemy_url = "mssql+pyodbc:///?odbc_connect=" + urllib.parse.quote_plus(
    connection_string
)
engine = sa.create_engine(sqlalchemy_url, fast_executemany=True)

with engine.begin() as conn:
    # set up test environment
    conn.execute(sa.text("DROP TABLE IF EXISTS actual_table;"))
    conn.execute(
        sa.text(
            """\
            CREATE TABLE actual_table (
                institution_no VARCHAR(3), 
                transit_no VARCHAR(5), 
                branch_name VARCHAR(50),
                CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED 
                    (institution_no, transit_no));
            """
        )
    )
    # actual_table initial state
    conn.execute(
        sa.text(
            """\
            INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES 
                ('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),
                ('003', '67890', 'RBC branch #67890 - Sudbury, ON');
            """
        )
    )
    # test data to be updated or inserted
    update_columns = ["institution_no", "transit_no", "branch_name"]
    update_data = [
        ["004", "12345", "TD branch #12345 - London, ON"],
        ["002", "45678", "Scotiabank branch #45678 - Timmins, ON"],
        ["004", "34567", "TD branch #34567 - Toronto, ON"],
    ]
    df_update = pd.DataFrame(update_data, columns=update_columns)

    # Here's where the real work begins ...
    #
    # Step 1: upload update data
    df_update.to_sql("#update_table", conn, index=False)
    #
    # Step 2: perform the "upsert"
    sql = """\
    SET NOCOUNT ON;
    DECLARE @rows_updated INT = 0;
    DECLARE @rows_inserted INT = 0;
    
    UPDATE a SET a.branch_name = u.branch_name
        FROM actual_table a INNER JOIN #update_table u
            ON a.institution_no = u.institution_no 
                AND a.transit_no = u.transit_no;
    SELECT @rows_updated = @@ROWCOUNT;
    
    INSERT INTO actual_table (institution_no, transit_no, branch_name)
        SELECT institution_no, transit_no, branch_name
        FROM #update_table u
        WHERE NOT EXISTS (
            SELECT * FROM actual_table
            WHERE institution_no = u.institution_no
                AND transit_no = u.transit_no
        );
    SELECT @rows_inserted = @@ROWCOUNT;
    
    SELECT @rows_updated AS rows_updated, @rows_inserted AS rows_inserted;
    """
    result = conn.execute(sa.text(sql)).fetchone()
    print(f"{result[0]} row(s) updated, {result[1]} row(s) inserted")
    # 1 row(s) updated, 2 row(s) inserted

# verify results
with engine.begin() as conn:
    pprint(conn.execute(sa.text("SELECT * FROM actual_table")).fetchall())
    """console output:
    [('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),
     ('003', '67890', 'RBC branch #67890 - Sudbury, ON'),
     ('004', '12345', 'TD branch #12345 - London, ON'),
     ('004', '34567', 'TD branch #34567 - Toronto, ON')]
    """

这篇关于使用 pyodbc 将 Pandas 数据帧高效更新插入 MS SQL Server的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆