"ResourceClosedError:交易已关闭".芹菜节拍和sqlalchemy +金字塔应用程序出错 [英] "ResourceClosedError: The transaction is closed" error with celery beat and sqlalchemy + pyramid app

查看:59
本文介绍了"ResourceClosedError:交易已关闭".芹菜节拍和sqlalchemy +金字塔应用程序出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个名为 mainsite 的金字塔应用.

I have a pyramid app called mainsite.

该网站以一种非常异步的方式工作,主要是通过从视图启动线程来执行后端操作.

The site works in a pretty asynchronous manner mostly through threads being launched from the view to carry out the backend operations.

它通过sqlalchemy连接到mysql,并使用ZopeTransactionExtension进行会话管理.

It connects to mysql with sqlalchemy and uses ZopeTransactionExtension for session management.

到目前为止,该应用程序一直运行良好.

So far the application has been running great.

我需要在其上运行定期作业,并且它需要使用从视图启动的某些相同的异步函数.

I need to run periodic jobs on it and it needs to use some of the same asynchronous functions that are being launched from the view.

我使用过apscheduler,但遇到了问题.因此,我想到了使用celery beat作为一个单独的过程,该过程将mainapp视为一个库并导入要使用的功能.

I used apscheduler but ran into issues with that. So I thought of using celery beat as a separate process that treats mainapp as a library and imports the functions to be used.

我的芹菜配置如下:

from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
    AUTH_DELETE_TIME

BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'

CELERYBEAT_SCHEDULE = {
    'rerun': {
        'task': 'tasks.rerun_scheduler',
        'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
    },
    'automate': {
        'task': 'tasks.automation_scheduler',
        'schedule': timedelta(seconds=20)
    },
    'remove-tokens': {
        'task': 'tasks.token_remover_scheduler',
        'schedule': timedelta(seconds=2 * 24 * 3600 )
    },
}

CELERY_TIMEZONE = 'UTC'

tasks.py是

from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')


@celery.task
def rerun_scheduler():
    from mainsite.task import check_update_rerun_tasks
    check_update_rerun_tasks()


@celery.task
def automation_scheduler():
    from mainsite.task import automate
    automate()


@celery.task
def token_remover_scheduler():
    from mainsite.auth_service import delete_old_tokens
    delete_old_tokens()

请记住,以上所有功能都会立即返回,但在需要时启动线程

keep in mind that all the above functions immediately return but launch threads if required

线程通过在session.add(object)之后执行 transaction.commit()将对象保存到db中.

The threads save objects into db by doing transaction.commit() after session.add(object).

问题在于整个过程只能像宝石一样工作约30分钟.此后, ResourceClosedError:关闭事务的错误开始发生在任何存在 transaction.commit()的地方.我不确定是什么问题,需要帮助进行故障排除.

The problem is that the whole things works like a gem only for about 30 minutes. After that ResourceClosedError: The transaction is closed errors starts happening wherever there is a transaction.commit(). I am not sure what is the problem and I need help troubleshooting.

我确实导入任务内部的原因是为了消除此错误.认为每次需要运行任务时都导入是一个好主意,每次我可能都会获得一个新事务,但事实并非如此.

The reason I do import inside the tasks was to get rid of this error. Thought importing every time task needed to be run was a good idea and I may get a new transaction each time, but looks like that is not the case.

推荐答案

以我的经验,尝试将配置为与金字塔配合使用的会话(与ZopeTransactionExtension等配合使用)与Celery工人一起使用,会导致严重的难以调试一团糟.

In my experience trying to reuse a session configured to be used with Pyramid (with ZopeTransactionExtension etc.) with a Celery worker results in a terrible hard-to-debug mess.

ZopeTransactionExtension将SQLAlchemy会话绑定到Pyramid的请求-响应周期-事务自动启动并提交或回滚,通常不应在代码中使用transaction.commit()-如果一切正常,中兴通讯将提交所有内容,如果您的代码引发异常,则您的交易将回滚.

ZopeTransactionExtension binds SQLAlchemy session to Pyramid's request-response cycle - a transaction is started and committed or rolled back automatically, you're generally not supposed to use transaction.commit() within your code - if everything is ok ZTE will commit everything, if your code raises and exception your transaction will be rolled back.

使用Celery,您需要手动管理SQLAlchemy会话,而ZTE阻止您这样做,因此您需要以不同的方式配置 DBSession .

With Celery you need to manage SQLAlchemy sessions manually, which ZTE prevents you from doing, so you need to configure your DBSession differently.

像这样简单的事情会起作用:

Something simple like this would work:

DBSession = None

def set_dbsession(session):
    global DBSession
    if DBSession is not None:
        raise AttributeError("DBSession has been already set to %s!" % DBSession)

    DBSession = session

然后从Pyramid启动代码中完成

And then from Pyramid startup code you do

def main(global_config, **settings):
    ...
    set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension())))

使用Celery有点棘手-我最终为Celery创建了一个自定义启动脚本,在其中配置了会话.

With Celery it's a bit trickier - I ended up creating a custom start script for Celery, in which I configure the session.

worker 鸡蛋的 setup.py 中:

  entry_points="""
  # -*- Entry points: -*-
  [console_scripts]
  custom_celery = worker.celeryd:start_celery
  custom_celerybeat = worker.celeryd:start_celerybeat
  """,
  )

worker/celeryd.py 中:

def initialize_async_session(db_string, db_echo):

    import sqlalchemy as sa
    from db import Base, set_dbsession

    session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True))
    engine = sa.create_engine(db_string, echo=db_echo)
    session.configure(bind=engine)

    set_dbsession(session)
    Base.metadata.bind = engine


def start_celery():
    initialize_async_session(DB_STRING, DB_ECHO)
    import celery.bin.celeryd
    celery.bin.celeryd.main()

如果您打算将应用程序部署到生产服务器上,则通常将从视图中启动线程以执行后端操作"所用的一般方法对我来说有点危险-Web服务器通常会回收,会杀死或创建新的工作人员",因此通常不能保证每个特定流程都可以在当前请求-响应周期之后继续存在.我从来没有尝试过这样做,所以也许你会没事的:)

The general approach you're using with "threads being launched from the view to carry out the backend operations" feels a bit dangerous to me if you ever plan to deploy the application to a production server - a web server often recycles, kills or creates new "workers" so generally there are no guarantees each particular process would survive beyond the current request-response cycle. I never tried doing this though, so maybe you'll be ok :)

这篇关于"ResourceClosedError:交易已关闭".芹菜节拍和sqlalchemy +金字塔应用程序出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆