"ResourceClosedError:交易已关闭".芹菜节拍和sqlalchemy +金字塔应用程序出错 [英] "ResourceClosedError: The transaction is closed" error with celery beat and sqlalchemy + pyramid app
问题描述
我有一个名为 mainsite
的金字塔应用.
I have a pyramid app called mainsite
.
该网站以一种非常异步的方式工作,主要是通过从视图启动线程来执行后端操作.
The site works in a pretty asynchronous manner mostly through threads being launched from the view to carry out the backend operations.
它通过sqlalchemy连接到mysql,并使用ZopeTransactionExtension进行会话管理.
It connects to mysql with sqlalchemy and uses ZopeTransactionExtension for session management.
到目前为止,该应用程序一直运行良好.
So far the application has been running great.
我需要在其上运行定期作业,并且它需要使用从视图启动的某些相同的异步函数.
I need to run periodic jobs on it and it needs to use some of the same asynchronous functions that are being launched from the view.
我使用过apscheduler,但遇到了问题.因此,我想到了使用celery beat作为一个单独的过程,该过程将mainapp视为一个库并导入要使用的功能.
I used apscheduler but ran into issues with that. So I thought of using celery beat as a separate process that treats mainapp as a library and imports the functions to be used.
我的芹菜配置如下:
from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
AUTH_DELETE_TIME
BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'
CELERYBEAT_SCHEDULE = {
'rerun': {
'task': 'tasks.rerun_scheduler',
'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
},
'automate': {
'task': 'tasks.automation_scheduler',
'schedule': timedelta(seconds=20)
},
'remove-tokens': {
'task': 'tasks.token_remover_scheduler',
'schedule': timedelta(seconds=2 * 24 * 3600 )
},
}
CELERY_TIMEZONE = 'UTC'
tasks.py是
from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')
@celery.task
def rerun_scheduler():
from mainsite.task import check_update_rerun_tasks
check_update_rerun_tasks()
@celery.task
def automation_scheduler():
from mainsite.task import automate
automate()
@celery.task
def token_remover_scheduler():
from mainsite.auth_service import delete_old_tokens
delete_old_tokens()
请记住,以上所有功能都会立即返回,但在需要时启动线程
keep in mind that all the above functions immediately return but launch threads if required
线程通过在session.add(object)之后执行 transaction.commit()将对象保存到db中.
The threads save objects into db by doing transaction.commit() after session.add(object)
.
问题在于整个过程只能像宝石一样工作约30分钟.此后, ResourceClosedError:关闭事务
的错误开始发生在任何存在 transaction.commit()
的地方.我不确定是什么问题,需要帮助进行故障排除.
The problem is that the whole things works like a gem only for about 30 minutes. After that ResourceClosedError: The transaction is closed
errors starts happening wherever there is a transaction.commit()
. I am not sure what is the problem and I need help troubleshooting.
我确实导入任务内部的原因是为了消除此错误.认为每次需要运行任务时都导入是一个好主意,每次我可能都会获得一个新事务,但事实并非如此.
The reason I do import inside the tasks was to get rid of this error. Thought importing every time task needed to be run was a good idea and I may get a new transaction each time, but looks like that is not the case.
推荐答案
以我的经验,尝试将配置为与金字塔配合使用的会话(与ZopeTransactionExtension等配合使用)与Celery工人一起使用,会导致严重的难以调试一团糟.
In my experience trying to reuse a session configured to be used with Pyramid (with ZopeTransactionExtension etc.) with a Celery worker results in a terrible hard-to-debug mess.
ZopeTransactionExtension将SQLAlchemy会话绑定到Pyramid的请求-响应周期-事务自动启动并提交或回滚,通常不应在代码中使用transaction.commit()-如果一切正常,中兴通讯将提交所有内容,如果您的代码引发异常,则您的交易将回滚.
ZopeTransactionExtension binds SQLAlchemy session to Pyramid's request-response cycle - a transaction is started and committed or rolled back automatically, you're generally not supposed to use transaction.commit() within your code - if everything is ok ZTE will commit everything, if your code raises and exception your transaction will be rolled back.
使用Celery,您需要手动管理SQLAlchemy会话,而ZTE阻止您这样做,因此您需要以不同的方式配置 DBSession
.
With Celery you need to manage SQLAlchemy sessions manually, which ZTE prevents you from doing, so you need to configure your DBSession
differently.
像这样简单的事情会起作用:
Something simple like this would work:
DBSession = None
def set_dbsession(session):
global DBSession
if DBSession is not None:
raise AttributeError("DBSession has been already set to %s!" % DBSession)
DBSession = session
然后从Pyramid启动代码中完成
And then from Pyramid startup code you do
def main(global_config, **settings):
...
set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension())))
使用Celery有点棘手-我最终为Celery创建了一个自定义启动脚本,在其中配置了会话.
With Celery it's a bit trickier - I ended up creating a custom start script for Celery, in which I configure the session.
在 worker
鸡蛋的 setup.py
中:
entry_points="""
# -*- Entry points: -*-
[console_scripts]
custom_celery = worker.celeryd:start_celery
custom_celerybeat = worker.celeryd:start_celerybeat
""",
)
在 worker/celeryd.py
中:
def initialize_async_session(db_string, db_echo):
import sqlalchemy as sa
from db import Base, set_dbsession
session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True))
engine = sa.create_engine(db_string, echo=db_echo)
session.configure(bind=engine)
set_dbsession(session)
Base.metadata.bind = engine
def start_celery():
initialize_async_session(DB_STRING, DB_ECHO)
import celery.bin.celeryd
celery.bin.celeryd.main()
如果您打算将应用程序部署到生产服务器上,则通常将从视图中启动线程以执行后端操作"所用的一般方法对我来说有点危险-Web服务器通常会回收,会杀死或创建新的工作人员",因此通常不能保证每个特定流程都可以在当前请求-响应周期之后继续存在.我从来没有尝试过这样做,所以也许你会没事的:)
The general approach you're using with "threads being launched from the view to carry out the backend operations" feels a bit dangerous to me if you ever plan to deploy the application to a production server - a web server often recycles, kills or creates new "workers" so generally there are no guarantees each particular process would survive beyond the current request-response cycle. I never tried doing this though, so maybe you'll be ok :)
这篇关于"ResourceClosedError:交易已关闭".芹菜节拍和sqlalchemy +金字塔应用程序出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!