如何在Celery任务中使用Flask-SQLAlchemy [英] How to use Flask-SQLAlchemy in a Celery task
问题描述
在我的后台任务中,我处理数据和SQLAlchemy ORM来存储数据。 Flask-Celery的维护者已经放弃了对插件的支持。该插件酸洗了Flask实例,因此我可以完全访问SQLAlchemy。
我试图在我的tasks.py文件中复制这个行为,但没有成功。你有什么提示如何实现这一点? 更新:我们已经开始使用更好的方式来处理应用程序拆卸和建立在每个任务的基础上,模式描述的模式在最近flask文档。
extensions.py
import烧瓶
从flask.ext.sqlalchemy导入SQLAlchemy $ b $从芹菜进口芹菜
类FlaskCelery(芹菜):
def __init __(self,* args ,** kwargs):
super(FlaskCelery,self).__ init __(* args,** kwargs)
self.patch_task()
if'app 'kwargs:
self.init_app(kwargs ['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
类ContextTask(TaskBase):
abstract = True
def __call __(self,* args,** kwargs):
如果flask.has_app_context():
返回TaskBase .__调用__(self,* args,** kwargs)
else:
with _celery.app.app_context():
return TaskBase .__ call __(self,* args,** kwargs)
self.Task = ContextTask
def init_app(self,app):
self.app = app
self.config_from_object(app.config)
$ b celery = FlaskCelery()
db = SQLAlchemy()
app.py
< pre $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $
$ b#configure /初始化所有的扩展名
db.init_app(app)
celery.init_app(app)
$ b $返回应用程序
一旦你这样设置了你的应用程序,你就可以运行和使用芹菜而不必显式地在应用程序上下文中,因为所有的任务将自动运行在应用程序上下文中(如果有必要的话),而且不必担心任务后拆卸,这是一个重要的管理问题(请参阅下面的其他响应)。
旧的答案在下面,仍然有效,但不是一个干净的解决方案
我宁愿运行所有通过创建一个单独的文件来调用celery.start()和应用程序的上下文,从而在应用程序上下文中调用celery.start()。这意味着你的任务文件不需要放置上下文设置和拆卸。它也适用于烧瓶的应用工厂模式。
extensions.py
from from flask.ext.sqlalchemy从celery import导入SQLAlchemy
Celery
$ b $ db = SQLAlchemy()
celery = Celery()
tasks.py
从扩展中导入芹菜,db
从flask.globals导入current_app $ b $从celery.signals导入task_postrun
@ celery.task
def do_some_stuff():
current_app.logger.info(我有应用程序上下文)
#你现在可以使用db对象从扩展
@ task_postrun.connect
def close_session(* args,** kwargs):
#Flask SQLAlchemy会自动为您创建新的会话
#一个有作用域的会话工厂,因为我们正在维护相同的应用
#上下文,这确保了任务有一个新的会话(例如会话错误
#不会传播任务)
d b.session.remove()
app.py
从扩展导入芹菜,db
$ b $ def create_app():
app = Flask()
#配置/初始化所有的扩展
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
从扩展导入芹菜
app = create_app()
如果__name__ =='__main__':
with app.app_context():
celery.start()
I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.
In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.
I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?
Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).
Old answer below, still works, but not as clean a solution
I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
app.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()
这篇关于如何在Celery任务中使用Flask-SQLAlchemy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!