使用SQLAlchemy批量插入Pandas DataFrame [英] Bulk Insert A Pandas DataFrame Using SQLAlchemy
问题描述
我有一些大熊猫DataFrame,我想使用新的批量SQL映射通过SQL Alchemy将它们上传到Microsoft SQL Server. pandas.to_sql方法虽然不错,但速度很慢.
I have some rather large pandas DataFrames and I'd like to use the new bulk SQL mappings to upload them to a Microsoft SQL Server via SQL Alchemy. The pandas.to_sql method, while nice, is slow.
我在编写代码时遇到问题...
I'm having trouble writing the code...
我希望能够将此函数传递给一个我正在调用table
的pandas DataFrame,一个我正在调用schema
的架构名称以及一个我正在调用name
的表名称.理想情况下,该函数将1.)删除表(如果已存在). 2.)创建一个新表3.)创建一个映射器,并4.)使用映射器和熊猫数据批量插入.我陷入了第3部分.
I'd like to be able to pass this function a pandas DataFrame which I'm calling table
, a schema name I'm calling schema
, and a table name I'm calling name
. Ideally, the function will 1.) delete the table if it already exists. 2.) create a new table 3.) create a mapper and 4.) bulk insert using the mapper and pandas data. I'm stuck on part 3.
这是我的代码(很粗糙).我正在努力使映射器功能与我的主键一起使用.我并不是真的需要主键,但是mapper函数需要它.
Here's my (admittedly rough) code. I'm struggling with how to get the mapper function to work with my primary keys. I don't really need primary keys but the mapper function requires it.
感谢您的见解.
from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase
def bulk_upload(table, schema, name):
e = create_engine('mssql+pyodbc://MYDB')
s = create_session(bind=e)
m = MetaData(bind=e,reflect=True,schema=schema)
Base = declarative_base(bind=e,metadata=m)
t = Table(name,m)
m.remove(t)
t.drop(checkfirst=True)
sqld = SQLDatabase(e, schema=schema,meta=m)
sqlt = SQLTable(name, sqld, table).table
sqlt.metadata = m
m.create_all(bind=e,tables=[sqlt])
class MyClass(Base):
return
mapper(MyClass, sqlt)
s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
return
推荐答案
我遇到了一个类似的问题,pd.to_sql需要花费数小时才能上传数据.下面的代码在几秒钟内批量插入了相同的数据.
I ran into a similar issue with pd.to_sql taking hours to upload data. The below code bulk inserted the same data in a few seconds.
from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO
address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()
#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)
#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()
#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")
connection.commit()
cur.close()
这篇关于使用SQLAlchemy批量插入Pandas DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!