使用SQLAlchemy批量插入Pandas DataFrame [英] Bulk Insert A Pandas DataFrame Using SQLAlchemy

查看:539
本文介绍了使用SQLAlchemy批量插入Pandas DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些大熊猫DataFrame,我想使用新的批量SQL映射通过SQL Alchemy将它们上传到Microsoft SQL Server. pandas.to_sql方法虽然不错,但速度很慢.

I have some rather large pandas DataFrames and I'd like to use the new bulk SQL mappings to upload them to a Microsoft SQL Server via SQL Alchemy. The pandas.to_sql method, while nice, is slow.

我在编写代码时遇到问题...

I'm having trouble writing the code...

我希望能够将此函数传递给一个我正在调用table的pandas DataFrame,一个我正在调用schema的架构名称以及一个我正在调用name的表名称.理想情况下,该函数将1.)删除表(如果已存在). 2.)创建一个新表3.)创建一个映射器,并4.)使用映射器和熊猫数据批量插入.我陷入了第3部分.

I'd like to be able to pass this function a pandas DataFrame which I'm calling table, a schema name I'm calling schema, and a table name I'm calling name. Ideally, the function will 1.) delete the table if it already exists. 2.) create a new table 3.) create a mapper and 4.) bulk insert using the mapper and pandas data. I'm stuck on part 3.

这是我的代码(很粗糙).我正在努力使映射器功能与我的主键一起使用.我并不是真的需要主键,但是mapper函数需要它.

Here's my (admittedly rough) code. I'm struggling with how to get the mapper function to work with my primary keys. I don't really need primary keys but the mapper function requires it.

感谢您的见解.

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return

推荐答案

我遇到了一个类似的问题,pd.to_sql需要花费数小时才能上传数据.下面的代码在几秒钟内批量插入了相同的数据.

I ran into a similar issue with pd.to_sql taking hours to upload data. The below code bulk inserted the same data in a few seconds.

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()

这篇关于使用SQLAlchemy批量插入Pandas DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆