执行批量插入SQLAlchemy的最佳方法 [英] Best way to perform bulk insert SQLAlchemy
问题描述
我有一个名为 products
具有以下各列 id
, product_id
, data
, activity_id
我本质上想做的是复制大量现有产品并更新它的 activity_id
并在products表中创建新条目.示例:
我在具有activity_id 2的产品中已经有70个现有条目
现在,我想创建另外70个具有相同数据的条目,但更新后的 activity_id
我可能要复制成千上万个现有条目,并将复制的条目activity_id更新为新的ID.
products = self.session.query(model.Products).filter(filter1,filter2).all()
这将为过滤器返回所有现有产品.
然后我遍历产品,然后简单地克隆现有产品,并更新activity_id字段.
产品中产品的 :product.activity_id = new_idself.uow.skus.bulk_save_objects(simulation_skus)self.uow.flush()self.uow.commit()
进行这些批量输入的最佳/最快方法是什么,这样可以节省时间,到目前为止,它的性能还不错,是否有更好的解决方案?
您不需要在本地加载这些对象,您真正想要做的就是让数据库创建这些行./p>
您本质上想运行一个查询,以从现有行中创建行:
INSERT INTO产品(product_id,data,activity_id)SELECT product_id,数据2-新的activity_id值从产品在哪里activity_id = old_id
以上查询将完全在数据库服务器上运行;与将查询加载到Python对象中,然后将所有Python数据发送回服务器以为每个新行填充 INSERT
语句相比,这种方法要好得多.
您可以使用 SQLAlchemy core ,用于处理生成SQL语句的API的一半.但是,您可以使用从 表
模型的实例,这样您就可以通过 Table.insert()
方法.
您还可以从 models.Product
查询中获得相同的对象,稍后再介绍. models.Product
查询获取Python实例数据的语句;您可以通过 Query.statement
属性. activity_id
列替换为新值,并删除主键(假设您具有自动递增的主键列). inspect()
函数应用于模型类的a>为您提供了 Mapper
实例,该实例又具有 Select.with_only_columns()
方法产生一个新的 SELECT
语句,我们在其中替换了该列.您无法轻松地从select语句中删除列,但是我们可以在 SELECT ,然后同时进行更换.
第4步很简单, Insert.from_select()
需要具有要插入的列和 SELECT
查询.我们既有 SELECT
对象,也有列.
这是用于生成您的 INSERT
的代码; ** replace
关键字参数是插入时要替换的列:
,文字从sqlalchemy.sql导入ClauseElementdef insert_from_query(模型,查询,**替换):#表的SQLAlchemy核心定义表= inspect(model).local_table#和基础核心select语句从中获取新行选择= query.statement#验证假设:确保查询产生上表中的行在select.froms中的assert表中,f"{query!r}必须从{model!r}"中产生行.断言所有(table.columns中select.columns中的c.name),f"{query!r}必须包括所有{model!r}列"#更新选择,替换指示的列as_clause = lambda v:文字(v)如果不是isinstance(v,ClauseElement)否则v替换= {name:as_clause(value).label(name)for name,replace.items()中的值}from_select = select.with_only_columns([replaces.get(c.name,c)对于table.columns中的c如果不是c.primary_key])返回table.insert().from_select(from_select.columns,from_select)
我包括一些关于模型和查询关系的断言,并且代码接受任意列子句作为替换,而不仅仅是文字值.例如,您可以使用 func.max(models.Product.activity_id)+ 1
作为替换值(包装为子选择).
上面的函数执行步骤1-4,在打印时生成所需的 INSERT
SQL语句(我创建了 products
模型并查询我认为可能具有代表性的查询):
>>>打印(insert_from_query(models.Product,products,activity_id = 2))INSERT INTO产品(product_id,数据,activity_id)选择products.product_id,products.data,:param_1 AS activity_id从产品WHERE products.activity_id!=:activity_id_1
您所要做的就是执行它:
<代码> insert_stmt = insert_from_query(型号.产品,产品,activity_id = 2)self.session.execute(insert_stmt)
I have a tabled called products
which has following columns
id
, product_id
, data
, activity_id
What I am essentially trying to do is copy bulk of existing products and update it's activity_id
and create new entry in the products table.
Example:
I already have 70 existing entries in products with activity_id 2
Now I want to create another 70 entries with same data except for updated activity_id
I could have thousands of existing entries that I'd like to make a copy of and update the copied entries activity_id to be a new id.
products = self.session.query(model.Products).filter(filter1, filter2).all()
This returns all the existing products for a filter.
Then I iterate through products, then simply clone existing products and just update activity_id field.
for product in products:
product.activity_id = new_id
self.uow.skus.bulk_save_objects(simulation_skus)
self.uow.flush()
self.uow.commit()
What is the best/ fastest way to do these bulk entries so it kills time, as of now it's OK performance, is there a better solution?
You don't need to load these objects locally, all you really want to do is have the database create these rows.
You essentially want to run a query that creates the rows from the existing rows:
INSERT INTO product (product_id, data, activity_id)
SELECT product_id, data, 2 -- the new activity_id value
FROM product
WHERE activity_id = old_id
The above query would run entirely on the database server; this is far preferable over loading your query into Python objects, then sending all the Python data back to the server to populate INSERT
statements for each new row.
Queries like that are something you could do with SQLAlchemy core, the half of the API that deals with generating SQL statements. However, you can use a query built from a declarative ORM model as a starting point. You'd need to
- Access the
Table
instance for the model, as that then lets you create anINSERT
statement via theTable.insert()
method.
You could also get the same object frommodels.Product
query, more on that later. - Access the statement that would normally fetch the data for your Python instances for your filtered
models.Product
query; you can do so via theQuery.statement
property. - Update the statement to replace the included
activity_id
column with your new value, and remove the primary key (I'm assuming that you have an auto-incrementing primary key column). - Apply that updated statement to the
Insert
object for the table viaInsert.from_select()
. - Execute the generated
INSERT INTO ... FROM ...
query.
Step 1 can be achieved by using the SQLAlchemy introspection API; the inspect()
function, applied to a model class, gives you a Mapper
instance, which in turn has a Mapper.local_table
attribute.
Steps 2 and 3 require a little juggling with the Select.with_only_columns()
method to produce a new SELECT
statement where we swapped out the column. You can't easily remove a column from a select statement but we can, however, use a loop over the existing columns in the query to 'copy' them across to the new SELECT
, and at the same time make our replacement.
Step 4 is then straightforward, Insert.from_select()
needs to have the columns that are inserted and the SELECT
query. We have both as the SELECT
object we have gives us its columns too.
Here is the code for generating your INSERT
; the **replace
keyword arguments are the columns you want to replace when inserting:
from sqlalchemy import inspect, literal
from sqlalchemy.sql import ClauseElement
def insert_from_query(model, query, **replace):
# The SQLAlchemy core definition of the table
table = inspect(model).local_table
# and the underlying core select statement to source new rows from
select = query.statement
# validate asssumptions: make sure the query produces rows from the above table
assert table in select.froms, f"{query!r} must produce rows from {model!r}"
assert all(c.name in select.columns for c in table.columns), f"{query!r} must include all {model!r} columns"
# updated select, replacing the indicated columns
as_clause = lambda v: literal(v) if not isinstance(v, ClauseElement) else v
replacements = {name: as_clause(value).label(name) for name, value in replace.items()}
from_select = select.with_only_columns([
replacements.get(c.name, c)
for c in table.columns
if not c.primary_key
])
return table.insert().from_select(from_select.columns, from_select)
I included a few assertions about the model and query relationship, and the code accepts arbitrary column clauses as replacements, not just literal values. You could use func.max(models.Product.activity_id) + 1
as a replacement value (wrapped as a subselect), for example.
The above function executes steps 1-4, producing the desired INSERT
SQL statement when printed (I created a products
model and query that I thought might be representative):
>>> print(insert_from_query(models.Product, products, activity_id=2))
INSERT INTO products (product_id, data, activity_id) SELECT products.product_id, products.data, :param_1 AS activity_id
FROM products
WHERE products.activity_id != :activity_id_1
All you have to do is execute it:
insert_stmt = insert_from_query(models.Product, products, activity_id=2)
self.session.execute(insert_stmt)
这篇关于执行批量插入SQLAlchemy的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!