当SQLAlchemy事件触发Celery任务时,连接关闭 [英] Connection is closed when a SQLAlchemy event triggers a Celery task

查看:794
本文介绍了当SQLAlchemy事件触发Celery任务时,连接关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我的一个单元测试删除一个SQLAlchemy对象时,该对象触发一个after_delete事件,触发Celery任务从驱动器中删除一个文件。



在测试时是 CELERY_ALWAYS_EAGER = True

要点轻松重现问题

这个例子有两个测试。一个在事件触发任务,另一个在事件外触发。

要快速重现错误,您可以运行:

  git clone https://gist.github.com/5762792fc1d628843697.git 
cd 5762792fc1d628843697
virtualenv venv
。 venv / bin / activate
pip install -r requirements.txt
python test.py

堆栈:

  $ python test.py 
E
====== ================================================== ==============
错误:test_delete_task(__main __。CeleryTestCase)
-------------------- --------------------------------------------------
Traceback(最近一次调用最后一次):
在test_delete_task
文件test.py中,第73行db.session.commit()
文件/ home / brice /代码/ 5762792fc1d628843697 / venv / local / lib / python2.7 / site-packages / sqlalchemy / orm / scoping.py,第150行,在
返回getattr(self.registry(),name)(* args ,** kwargs)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第776行,在提交
self.transaction.commit()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第377行,提交
self._prepare_impl()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第357行,在_prepare_impl
self.session.flush()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session .py,第1919行,在
self._flush(objects)
文件中/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/ orm / session.py,行2037,在_flush
transaction.rollback(_capture_exception = True)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site / home / brice / Code / 5762792fc1d628843697 / venv / local / bash / code / 5762792fc1d628843697/ packages / sqlalchemy / util / langhelpers.py,第63行,在__exit__
compat.reraise(type_,value,traceback) lib / python2.7 / site-packages / sqlalchemy / orm / session.py,第2037行,在_flush
transaction.rollback(_capture_exception = True)
文件/ home / brice / Code / 5762 792fc1d628843697 / venv / local / lib / python2.7 / site-packages / sqlalchemy / orm / session.py,第393行,回滚
self._assert_active(prepared_ok = True,rollback_ok = True)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第223行,在_assert_active
中引发sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError:此事务已关闭

-------------------------------- --------------------------------------
Ran 1测试0.014s

FAILED(errors = 1)


解决方案

我觉得我发现了这个问题 - 这是你如何设置你的芹菜任务。如果你从celery设置中删除应用程序上下文调用,一切都运行正常:

$> $ $ c $> abstract = True

def __call __(self,* args,** kwargs):
#deleted - >与app.app_context():
返回TaskBase .__调用__(self,* args,** kwargs)

SQLAlchemy文档中有一个很大的警告,那就是在after_delete事件期间从不修改会话: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete



所以我怀疑在删除期间调用app.app_context():的,试图附加到和/或修改会话Flask-SQLAlchemy存储在应用程序对象中,因此整个事情就是轰炸。

Flask-SQlAlchemy在幕后为你做了许多魔术,但你可以绕过这个并直接使用SQLAlchemy。如果您需要在删除事件期间与数据库交谈,您可以创建一个新的会话到数据库:

  @celery。 ()
def my_task():
#显然这里我创建了一个新的对象
session = db.create_scoped_session()
session.add(User(id = 13,value =random string))
session.commit()
return

但是这听起来像你不需要这个,你只是想删除一个图像路径。在这种情况下,我只是改变你的任务,所以它需要一个路径:

 #instance将调用任务
@ event.listens_for(User,after_delete)
def after_delete(mapper,connection,target):
my_task.delay(target.value)
$ b $ celery.task )
def my_task(image_path):
os.remove(image_path)

希望这有帮助 - 让我知道,如果有任何不适合你。感谢您的非常详细的设置,它真的帮助调试。


When one of my unit tests deletes a SQLAlchemy object, the object triggers an after_delete event which triggers a Celery task to delete a file from the drive.

The task is CELERY_ALWAYS_EAGER = True when testing.

gist to reproduce the issue easily

The example has two tests. One triggers the task in the event, the other outside the event. Only the one in the event closes the connection.

To quickly reproduce the error you can run:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py

The stack:

$     python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 73, in test_delete_task
    db.session.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit
    self.transaction.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit
    self._prepare_impl()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl
    self.session.flush()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
    self._flush(objects)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
    compat.reraise(type_, value, traceback)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback
    self._assert_active(prepared_ok=True, rollback_ok=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
    raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed

----------------------------------------------------------------------
Ran 1 test in 0.014s

FAILED (errors=1)

解决方案

I think I found the problem - it's in how you set up your Celery task. If you remove the app context call from your celery setup, everything runs fine:

class ContextTask(TaskBase):
    abstract = True

    def __call__(self, *args, **kwargs):
        # deleted --> with app.app_context():
        return TaskBase.__call__(self, *args, **kwargs)

There's a big warning in the SQLAlchemy docs about never modifying the session during after_delete events: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

So I suspect the with app.app_context(): is being called during the delete, trying to attach to and/or modify the session that Flask-SQLAlchemy stores in the app object, and therefore the whole thing is bombing.

Flask-SQlAlchemy does a lot of magic behind the scenes for you, but you can bypass this and use SQLAlchemy directly. If you need to talk to the database during the delete event, you can create a new session to the db:

@celery.task()
def my_task():
    # obviously here I create a new object
    session = db.create_scoped_session()
    session.add(User(id=13, value="random string"))
    session.commit()
    return

But it sounds like you don't need this, you're just trying to delete an image path. In that case, I would just change your task so it takes a path:

# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
    my_task.delay(target.value)

@celery.task()
def my_task(image_path):
    os.remove(image_path) 

Hopefully that's helpful - let me know if any of that doesn't work for you. Thanks for the very detailed setup, it really helped in debugging.

这篇关于当SQLAlchemy事件触发Celery任务时,连接关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆