如何在 Celery 任务中使用 Flask-SQLAlchemy [英] How to use Flask-SQLAlchemy in a Celery task

查看:76
本文介绍了如何在 Celery 任务中使用 Flask-SQLAlchemy的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近切换到 Celery 3.0.在此之前,我使用 Flask-Celery 来将 Celery 与 Flask 集成.虽然它有很多问题,比如隐藏了一些强大的 Celery 功能,但它允许我使用 Flask 应用程序的完整上下文,尤其是 Flask-SQLAlchemy.

I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.

在我的后台任务中,我正在处理数据和 SQLAlchemy ORM 来存储数据.Flask-Celery 的维护者已经放弃了对该插件的支持.该插件正在处理任务中的 Flask 实例,以便我可以完全访问 SQLAlchemy.

In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.

我正在尝试在我的 tasks.py 文件中复制此行为,但没有成功.您对如何实现这一目标有任何提示吗?

I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?

推荐答案

更新:我们已经开始使用更好的方法来处理应用程序的拆卸和基于每个任务的设置,基于 在最近的烧瓶文档中.

extensions.py

Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.

extensions.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

app.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

一旦您以这种方式设置了您的应用程序,您就可以运行和使用 celery,而无需在应用程序上下文中显式运行它,因为您的所有任务都将在必要时自动在应用程序上下文中运行,而您不需要不必明确担心任务后拆卸,这是一个需要管理的重要问题(请参阅下面的其他回复).

Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).

那些不断使用 _celery.app.app_context() 获取 的人:AttributeError: 'FlaskCelery' object has no attribute 'app' 确保:

Those who keep getting with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app' make sure to:

  1. app.py 文件级别保留 celery 导入.避免:
  1. Keep the celery import at the app.py file level. Avoid:

app.py

from flask import Flask

def create_app():
    app = Flask()

    initiliaze_extensions(app)

    return app

def initiliaze_extensions(app):
    from extensions import celery, db # DOOMED! Keep celery import at the FILE level
    
    db.init_app(app)
    celery.init_app(app)

  1. 在你烧瓶运行之前启动你的芹菜工人并使用
  1. Start you celery workers BEFORE you flask run and use

celery worker -A app:celery -l info -f celery.log

注意app:celery,即从app.py加载.

你仍然可以从扩展导入来装饰任务,即from extensions import celery.

You can still import from extensions to decorate tasks, i.e. from extensions import celery.

我更喜欢通过创建一个单独的文件来在应用程序上下文中运行所有 celery,该文件使用应用程序的上下文调用 celery.start().这意味着您的任务文件不必充斥着上下文设置和拆卸.它还非常适合烧瓶应用程序工厂"模式.

I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.

extensions.py

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

tasks.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

app.py

from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

运行Celery.py

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()

这篇关于如何在 Celery 任务中使用 Flask-SQLAlchemy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆