当SQLAlchemy事件触发Celery任务时,连接关闭 [英] Connection is closed when a SQLAlchemy event triggers a Celery task
问题描述
当我的一个单元测试删除一个SQLAlchemy对象时,该对象触发一个after_delete事件,触发Celery任务从驱动器中删除一个文件。
在测试时是 CELERY_ALWAYS_EAGER = True
。
这个例子有两个测试。一个在事件触发任务,另一个在事件外触发。
要快速重现错误,您可以运行:
git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
。 venv / bin / activate
pip install -r requirements.txt
python test.py
堆栈:
$ python test.py
E
====== ================================================== ==============
错误:test_delete_task(__main __。CeleryTestCase)
-------------------- --------------------------------------------------
Traceback(最近一次调用最后一次):
在test_delete_task
文件test.py中,第73行db.session.commit()
文件/ home / brice /代码/ 5762792fc1d628843697 / venv / local / lib / python2.7 / site-packages / sqlalchemy / orm / scoping.py,第150行,在
返回getattr(self.registry(),name)(* args ,** kwargs)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第776行,在提交
self.transaction.commit()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第377行,提交
self._prepare_impl()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第357行,在_prepare_impl
self.session.flush()
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session .py,第1919行,在
self._flush(objects)
文件中/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/ orm / session.py,行2037,在_flush
transaction.rollback(_capture_exception = True)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site / home / brice / Code / 5762792fc1d628843697 / venv / local / bash / code / 5762792fc1d628843697/ packages / sqlalchemy / util / langhelpers.py,第63行,在__exit__
compat.reraise(type_,value,traceback) lib / python2.7 / site-packages / sqlalchemy / orm / session.py,第2037行,在_flush
transaction.rollback(_capture_exception = True)
文件/ home / brice / Code / 5762 792fc1d628843697 / venv / local / lib / python2.7 / site-packages / sqlalchemy / orm / session.py,第393行,回滚
self._assert_active(prepared_ok = True,rollback_ok = True)
文件/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py,第223行,在_assert_active
中引发sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError:此事务已关闭
-------------------------------- --------------------------------------
Ran 1测试0.014s
FAILED(errors = 1)
我觉得我发现了这个问题 - 这是你如何设置你的芹菜任务。如果你从celery设置中删除应用程序上下文调用,一切都运行正常:
$> $ $ c $> abstract = True
def __call __(self,* args,** kwargs):
#deleted - >与app.app_context():
返回TaskBase .__调用__(self,* args,** kwargs)
SQLAlchemy文档中有一个很大的警告,那就是在after_delete事件期间从不修改会话: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete
所以我怀疑在删除期间调用app.app_context():的 Flask-SQlAlchemy在幕后为你做了许多魔术,但你可以绕过这个并直接使用SQLAlchemy。如果您需要在删除事件期间与数据库交谈,您可以创建一个新的会话到数据库: 但是这听起来像你不需要这个,你只是想删除一个图像路径。在这种情况下,我只是改变你的任务,所以它需要一个路径: 希望这有帮助 - 让我知道,如果有任何不适合你。感谢您的非常详细的设置,它真的帮助调试。 When one of my unit tests deletes a SQLAlchemy object, the object triggers an after_delete event which triggers a Celery task to delete a file from the drive. The task is gist to reproduce the issue easily The example has two tests. One triggers the task in the event, the other outside the event. Only the one in the event closes the connection. To quickly reproduce the error you can run: The stack:
I think I found the problem - it's in how you set up your Celery task. If you remove the app context call from your celery setup, everything runs fine: There's a big warning in the SQLAlchemy docs about never modifying the session during after_delete events: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete So I suspect the Flask-SQlAlchemy does a lot of magic behind the scenes for you, but you can bypass this and use SQLAlchemy directly. If you need to talk to the database during the delete event, you can create a new session to the db: But it sounds like you don't need this, you're just trying to delete an image path. In that case, I would just change your task so it takes a path: Hopefully that's helpful - let me know if any of that doesn't work for you. Thanks for the very detailed setup, it really helped in debugging. 这篇关于当SQLAlchemy事件触发Celery任务时,连接关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!,试图附加到和/或修改会话Flask-SQLAlchemy存储在
应用程序
对象中,因此整个事情就是轰炸。
@celery。 ()
def my_task():
#显然这里我创建了一个新的对象
session = db.create_scoped_session()
session.add(User(id = 13,value =random string))
session.commit()
return
#instance将调用任务
@ event.listens_for(User,after_delete)
def after_delete(mapper,connection,target):
my_task.delay(target.value)
$ b $ celery.task )
def my_task(image_path):
os.remove(image_path)
CELERY_ALWAYS_EAGER = True
when testing. git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py
$ python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "test.py", line 73, in test_delete_task
db.session.commit()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit
self.transaction.commit()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit
self._prepare_impl()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl
self.session.flush()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
self._flush(objects)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
transaction.rollback(_capture_exception=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
compat.reraise(type_, value, traceback)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
transaction.rollback(_capture_exception=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed
----------------------------------------------------------------------
Ran 1 test in 0.014s
FAILED (errors=1)
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
# deleted --> with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
with app.app_context():
is being called during the delete, trying to attach to and/or modify the session that Flask-SQLAlchemy stores in the app
object, and therefore the whole thing is bombing. @celery.task()
def my_task():
# obviously here I create a new object
session = db.create_scoped_session()
session.add(User(id=13, value="random string"))
session.commit()
return
# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
my_task.delay(target.value)
@celery.task()
def my_task(image_path):
os.remove(image_path)