如何在没有全局变量的celery任务中设置sqlalchemy会话 [英] how to setup sqlalchemy session in celery tasks with no global variable

查看:302
本文介绍了如何在没有全局变量的celery任务中设置sqlalchemy会话的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

摘要:我想在celery任务中使用sqlalchemy会话,而又不包含包含该会话的全局变量。



我在一个有celery任务的项目中使用SQLAlchemy,而我正在



当前,我已经定义了一个全局变量'session'以及我的celery应用程序设置(celery.py),并带有一个工作程序信号来对其进行设置

  session = scoped_session(sessionmaker())

@ celeryd_init.connect
def configure_workers(sender = None,conf = None,** kwargs):
#加载应用程序配置
#db_uri = conf ['db_uri']
engine = create_engine(db_uri)
session.configure(bind = engine)

在定义任务的模块中,我只需导入'会话并使用它。使用自定义类定义任务,该类将在返回后关闭会话:

  class DBTask(Task):
def after_return (self,* args,** kwargs):
session.remove()

但是效果很好:当使用CELERY_ALWAYS_EAGER = True进行单元测试时,将不会配置会话。到目前为止,我发现的唯一解决方案是在单元测试中运行任务时模拟会话变量:

 与嘲笑。补丁('celerymodule.tasks.session',self.session):
do_something.delay(...)

虽然有效,但我不想这样做。



有什么方法可以建立一个不会成为会话的会话。全局变量,既可以用于正常的异步行为,又可以用于没有CELERY_ALWAYS_EAGER = True的工作程序?

解决方案

关于自定义任务类的官方文档。

>

我修改了用于访问数据库的任务的自定义任务类:

  class DBTask(Task):
_session = None

def after_return(self,* args,** kwargs):
如果self._session不是N一个:
self._session.remove()

@property
def session(self):
如果self._session为None:
_, self._session = _get_engine_session(self.conf ['db_uri'],
verbose = False)

返回self._session

我这样定义任务:

  @ app.task( base = DBTask,bind = True)
def do_stuff_with_db(self,conf,some_arg):
self.conf = conf
事物= self.session.query(Thing).filter_by(arg = some_arg).first()

这样,SQLAlchemy会话将仅为每个芹菜工人创建一次



这可以解决我的单元测试问题,因为SQLAlchemy会话设置现在独立于芹菜工人。 / p>

Summary: I want to use a sqlalchemy session in celery tasks without having a global variable containing that session.

I am using SQLAlchemy in a project with celery tasks, and I'm having

Currently, I have a global variable 'session' defined along with my celery app setup (celery.py), with a worker signal to set it up.

session = scoped_session(sessionmaker())

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    # load the application configuration
    # db_uri = conf['db_uri']
    engine = create_engine(db_uri)
    session.configure(bind=engine)

In the module defining the tasks, I simply import 'session' and use it. Tasks are defined with a custom class that closes the session after returning:

class DBTask(Task):
    def after_return(self, *args, **kwargs):
        session.remove()

That works well, however: when unit testing with CELERY_ALWAYS_EAGER=True, the session won't be configured. The only solution I've found so far is to mock that 'session' variable when running a task in a unit test:

with mock.patch('celerymodule.tasks.session', self.session):
    do_something.delay(...)

While it works, I don't want to do that.

Is there any way to setup a session that will no be a global variable, that will work both for normal asynchronous behavior and without workers with CELERY_ALWAYS_EAGER=True?

解决方案

The answer was right under my nose in the official documentation about custom task classes.

I modified the custom task class that I use for tasks accessing the database:

class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = _get_engine_session(self.conf['db_uri'],
                                                   verbose=False)

        return self._session

I define my tasks this way:

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()

That way, the SQLAlchemy session will only be created once for each celery worker process, and I don't need any global variable.

This solves the problem with my unit tests, since the SQLAlchemy session setup is now independant from the celery workers.

这篇关于如何在没有全局变量的celery任务中设置sqlalchemy会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆