SQLAlchemy批量更新策略 [英] SQLAlchemy bulk update strategies

查看:99
本文介绍了SQLAlchemy批量更新策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用SQLAlchemy(在GAE上,连接到Google的云MySQL)编写一个Web应用程序(Flask),并且需要对表进行批量更新.简而言之,完成了许多计算,导致需要在1000个对象上更新单个值.目前,我正在事务中完成所有操作,但最后,刷新/提交需要花些时间.

I am currently writing a web app (Flask) using SQLAlchemy (on GAE, connecting to Google's cloud MySQL) and needing to do bulk updates of a table. In short, a number of calculations are done resulting in a single value needing to be updated on 1000's of objects. At the moment I'm doing it all in a transaction, but still at the end, the flush/commit is taking ages.

该表在id上有一个索引,所有这些都在单个事务中完成.因此,我相信我避免了常见的错误,但是仍然很慢.

The table has an index on id and this is all carried out in a single transaction. So I believe I've avoided the usual mistakes, but is is still very slow.

INFO     2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}

据我了解,实际上没有办法在SQL中进行批量更新,并且上面的语句最终是发送到服务器的多个UPDATE语句.

From my understanding there is no way to do a bulk update in SQL actually, and the statement above ends up being multiple UPDATE statements being sent to the server.

我已经尝试过使用Session.bulk_update_mappings(),但实际上似乎并没有做任何事情:(不知道为什么,但是更新从未真正发生过.我看不到这种方法的实际使用示例(包括性能套件),因此不确定是否打算使用它.

I've tried using Session.bulk_update_mappings() but that doesn't seem to actually do anything :( Not sure why, but the updates never actually happen. I can't see any examples of this method actually being used (including in the performance suite) so not sure if it is intended to be used.

我见过讨论的一种技术批量插入另一个表,然后执行UPDATE JOIN.我已经对其进行了测试,如下所示,它似乎要快得多.

One technique I've seen discussed is doing a bulk insert into another table and then doing an UPDATE JOIN. I've given it a test, like below, and it seems to be significantly faster.

wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')

但是现在的问题是如何构造我的代码.我正在使用ORM,我需要以某种方式不要弄脏"原始的Wallet对象,以免它们以旧的方式提交.我可以只创建这些Ledger对象,并保留它们的列表,然后在批量操作的末尾手动插入它们.但这几乎闻起来像是我在复制ORM机制的一些工作.

But the problem now is how to structure my code. I'm using the ORM and I need to somehow not 'dirty' the original Wallet objects so that they don't get committed in the old way. I could just create these Ledger objects instead and keep a list of them about and then manually insert them at the end of my bulk operation. But that almost smells like I'm replicating some of the work of the ORM mechanism.

是否有更聪明的方法来做到这一点?到目前为止,我的大脑正在下降,例如:

Is there a smarter way to do this? So far my brain is going down something like:

class Wallet(Base):
    ...
    _balance = Column(Float)
    ...

@property
def balance(self):
    # first check if we have a ledger of the same id
    # and return the amount in that, otherwise...
    return self._balance

@balance.setter
def balance(self, amount):
    l = Ledger(id=self.id, amount=amount)
    # add l to a list somewhere then process later

# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE

正如我所说,这一切似乎都与我(可能)拥有的工具作斗争.有更好的方法来处理此问题吗?我可以利用ORM机制来执行此操作吗?还是有更好的方法来进行批量更新?

As I said, this all seems to be fighting against the tools I (may) have. Is there a better way to be handling this? Can I tap into the ORM mechanism to be doing this? Or is there an even better way to do the bulk updates?

或者事件和会话可能有一些聪明的地方?也许before_flush?

Or is there maybe something clever with events and sessions? Maybe before_flush?

所以我试图利用事件机制,现在有了这个机制:

EDIT 2: So I have tried to tap into the event machinery and now have this:

@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
    ledgers = []

    if session.dirty:
        for elem in session.dirty:
            if ( session.is_modified(elem, include_collections=False) ):
                if isinstance(elem, Wallet):
                    session.expunge(elem)
                    ledgers.append(Ledger(id=elem.id, amount=elem.balance))

    if ledgers:
        session.bulk_save_objects(ledgers)
        session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
        session.execute('TRUNCATE ledger')

对我来说,这似乎很hacky和邪恶,但似乎可以正常工作.有什么陷阱或更好的方法吗?

Which seems pretty hacky and evil to me, but appears to work OK. Any pitfalls, or better approaches?

-马特

推荐答案

您实际上要做的是绕过ORM,以优化性能.因此,不要惊讶于您正在复制ORM正在做的工作",因为这正是您需要做的.

What you're essentially doing is bypassing the ORM in order to optimize the performance. Therefore, don't be surprised that you're "replicating the work the ORM is doing" because that's exactly what you need to do.

除非您有很多地方需要进行这样的批量更新,否则我建议不要使用神奇的事件方法.简单地编写显式查询要简单得多.

Unless you have a lot of places where you need to do bulk updates like this, I would recommend against the magical event approach; simply writing the explicit queries is much more straightforward.

我建议做的是使用SQLAlchemy Core而不是ORM进行更新:

What I recommend doing is using SQLAlchemy Core instead of the ORM to do the update:

ledger = Table("ledger", db.metadata,
    Column("wallet_id", Integer, primary_key=True),
    Column("new_balance", Float),
    prefixes=["TEMPORARY"],
)


wallets = db_session.query(Wallet).all()

# figure out new balances
balance_map = {}
for w in wallets:
    balance_map[w.id] = calculate_new_balance(w)

# create temp table with balances we need to update
ledger.create(bind=db.session.get_bind())

# insert update data
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
                                           for k, v in balance_map.items()])

# perform update
db.session.execute(Wallet.__table__
                         .update()
                         .values(balance=ledger.c.new_balance)
                         .where(Wallet.__table__.c.id == ledger.c.wallet_id))

# drop temp table
ledger.drop(bind=db.session.get_bind())

# commit changes
db.session.commit()

这篇关于SQLAlchemy批量更新策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆