Django:如何在事务中包装批量更新/插入操作? [英] Django: how to wrap a bulk update/insert operation in transaction?

查看:85
本文介绍了Django:如何在事务中包装批量更新/插入操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的用例:

  • 我有多个并行运行的芹菜任务
  • 每个任务可以批量创建更新许多对象.为此,我正在使用 django-bulk
  • I have multiple celery tasks that run in parallel
  • Each task could Bulk create or update many objects. For this I'm using django-bulk

所以基本上我使用的是一个非常方便的函数insert_or_update_many :

So basically I'm using a very convenient function insert_or_update_many:

  1. 首先执行选择
  2. 如果找到对象,则会对其进行更新
  3. 否则它将创建它们

但这会引入并发问题.例如:如果在第1步中不存在对象,则将其添加到要插入的对象列表中.但是在此期间,可能发生了另一个Celery任务创建了该对象,并且当它尝试执行批量插入(步骤3)时,出现了重复Entry的错误.

But this introduces problems of concurrency. For example: if an object did not exist during the step 1 then it is added to a list of objects to be inserted later. But during this period can happen that another Celery task has created that object and when it tries to perform a bulk insert (step 3) I get an error of duplicate Entry.

我想我需要将3个步骤包装在一个阻止"块中.我已经阅读了有关事务的内容,并尝试将1,2,3步骤包装在带有transaction.commit_on_success:块

I guess I need to wrap the 3 steps in a 'blocking' block. I've read around about Transactions and I've tried to wrap the step 1,2,3 within a with transaction.commit_on_success: block

with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

但这对我不起作用.我不确定我是否对交易有充分的了解.我基本上需要一个块,可以在其中进行几个操作,以确保没有其他进程或线程正在访问(以写入方式)我的数据库资源.

But this does not work for me. I'm not sure I've got a full understanding of the transactions. I basically need a block where I can put several operations being sure no other process or thread is accessing (in write) my db resources.

推荐答案

我基本上需要一个块,可以在其中放置几个​​操作,以确保没有其他进程或线程正在(以写入方式)访问我的数据库资源.

I basically need a block where I can put several operations being sure no other process or thread is accessing (in write) my db resources.

Django交易一般不会为您保证.如果您来自计算机科学的其他领域,那么您自然会以这种方式将事务视为阻塞,但是在数据库世界中,存在不同类型的锁,它们位于不同的

Django transactions will not, in general, guarantee that for you. If you're coming from other areas of computer science you naturally think of a transaction as blocking in this way, but in the database world there are different kinds of locks, at different isolation levels, and they vary for each database. So to ensure that your transactions do this you're going to have to learn about transactions, about locks and their performance characteristics, and about the mechanisms supplied by your database for controlling them.

但是,让一大堆进程都试图锁定表以执行竞争性插入听起来不是一个好主意.如果很少发生冲突,则可以进行某种形式的乐观锁定,如果失败则重试该事务.或者,您可以将所有这些celery任务定向到一个进程中(如果无论如何要获取表锁,并行化它都没有性能优势).

However, having a bunch of processes all trying to lock the table in order to carry out competing inserts does not sound like a good idea. If collisions were rare you could do a form of optimistic locking and just retry the transaction if it fails. Or perhaps you can direct all of these celery tasks to a single process (there's no performance advantage to parallelizing this if you're going to acquire a table lock anyway).

我的建议是从忘记批量操作开始,而仅使用Django的

My suggestion would be to start out by forgetting the bulk operations and just do one row at a time using Django's update_or_create. As long as your database has constraints that prevent duplicate entries (which it sounds like it does), this should be free of the race conditions you describe above. If the performance really does turn out to be unacceptable, then look into more complex options.

采用乐观并发方法意味着通过获取表锁而不是防止冲突.,例如,您只是按常规进行操作,如果发现有问题,则重试该操作.在您的情况下,它可能类似于:

Taking the optimistic concurrency approach means that rather than preventing conflicts—by acquiring a table lock, say—you just proceed as normal and then retry the operation if there turns out to be a problem. In your case it might look something like:

while True:
    try:
        with transaction.atomic():
            # do your bulk insert / update operation
    except IntegrityError:
        pass
    else:
        break

因此,如果遇到竞争状况,则生成的 IntegrityError 将导致 transaction.atomic()块回滚已进行的所有更改,并且 while 循环将强制重试事务(大概是批量操作现在将看到新存在的行并将其标记为更新而不是插入).

So if you run into your race condition, the resulting IntegrityError will cause the transaction.atomic() block to roll back any changes that have been made, and the while loop will force a retry of the transaction (where presumably the bulk operation will now see the newly-existing row and mark it for updating rather than insertion).

如果碰撞很少见,这种方法可以很好地工作,而如果碰撞频繁则很难.

This kind of approach can work really well if collisions are rare, and really badly if they are frequent.

这篇关于Django:如何在事务中包装批量更新/插入操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆