如何在Celery任务中使用Flask-SQLAlchemy [英] How to use Flask-SQLAlchemy in a Celery task

查看:1607
本文介绍了如何在Celery任务中使用Flask-SQLAlchemy的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近切换到芹菜3.0。在此之前,我使用 Flask-Celery 将Celery和Flask整合在一起。尽管它隐藏了许多强大的Celery功能,但它允许我使用Flask应用程序的完整上下文,尤其是Flask-SQLAlchemy。

在我的后台任务中,我处理数据和SQLAlchemy ORM来存储数据。 Flask-Celery的维护者已经放弃了对插件的支持。该插件酸洗了Flask实例,因此我可以完全访问SQLAlchemy。

我试图在我的tasks.py文件中复制这个行为,但没有成功。你有什么提示如何实现这一点? 更新:我们已经开始使用更好的方式来处理应用程序拆卸和建立在每个任务的基础上,模式描述的模式在最近flask文档

extensions.py

  import烧瓶
从flask.ext.sqlalchemy导入SQLAlchemy $ b $从芹菜进口芹菜

类FlaskCelery(芹菜):

def __init __(self,* args ,** kwargs):

super(FlaskCelery,self).__ init __(* args,** kwargs)
self.patch_task()

if'app 'kwargs:
self.init_app(kwargs ['app'])

def patch_task(self):
TaskBase = self.Task
_celery = self

类ContextTask(TaskBase):
abstract = True

def __call __(self,* args,** kwargs):
如果flask.has_app_context():
返回TaskBase .__调用__(self,* args,** kwargs)
else:
with _celery.app.app_context():
return TaskBase .__ call __(self,* args,** kwargs)

self.Task = ContextTask

def init_app(self,app):
self.app = app
self.config_from_object(app.config)

$ b celery = FlaskCelery()
db = SQLAlchemy()

app.py

< pre $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $
$ b#configure /初始化所有的扩展名
db.init_app(app)
celery.init_app(app)
$ b $返回应用程序

一旦你这样设置了你的应用程序,你就可以运行和使用芹菜而不必显式地在应用程序上下文中,因为所有的任务将自动运行在应用程序上下文中(如果有必要的话),而且不必担心任务后拆卸,这是一个重要的管理问题(请参阅下面的其他响应)。

旧的答案在下面,仍然有效,但不是一个干净的解决方案



我宁愿运行所有通过创建一个单独的文件来调用celery.start()和应用程序的上下文,从而在应用程序上下文中调用celery.start()。这意味着你的任务文件不需要放置上下文设置和拆卸。它也适用于烧瓶的应用工厂模式。



extensions.py

  from from flask.ext.sqlalchemy从celery import导入SQLAlchemy 
Celery
$ b $ db = SQLAlchemy()
celery = Celery()

tasks.py

 从扩展中导入芹菜,db 
从flask.globals导入current_app $ b $从celery.signals导入task_postrun

@ celery.task
def do_some_stuff():
current_app.logger.info(我有应用程序上下文)
#你现在可以使用db对象从扩展

@ task_postrun.connect
def close_session(* args,** kwargs):
#Flask SQLAlchemy会自动为您创建新的会话
#一个有作用域的会话工厂,因为我们正在维护相同的应用
#上下文,这确保了任务有一个新的会话(例如会话错误
#不会传播任务)
d b.session.remove()

app.py

 从扩展导入芹菜,db 
$ b $ def create_app():
app = Flask()

#配置/初始化所有的扩展
db.init_app(app)
celery.config_from_object(app.config)

return app

RunCelery.py

  from app import create_app 
从扩展导入芹菜

app = create_app()

如果__name__ =='__main__':
with app.app_context():
celery.start()


I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.

In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.

I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?

解决方案

Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.

extensions.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

app.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).

Old answer below, still works, but not as clean a solution

I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

tasks.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

app.py

from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()

这篇关于如何在Celery任务中使用Flask-SQLAlchemy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆