如何使用芹菜任务访问orm? [英] How to access the orm with celery tasks?

查看:219
本文介绍了如何使用芹菜任务访问orm?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图使用sqlalchemy + celery beats翻转数据库中特定类型对象的布尔标志。但是,如何从tasks.py文件访问我的orm?从模型中导入Book
从应用程序导入periodic_task
导入create_celery_app
从模型导入book


 
celery = create_celery_app()
#创建芹菜:http://flask.pocoo.org/docs/0.10/patterns/celery/

#这个任务正常工作
@ celery.task
def celery_send_email(to,subject,template):
with current_app.app_context():
msg =消息(
主题,
收件人= [to],
html = template,
sender = current_app.config ['MAIL_DEFAULT_SENDER']

return mail.send(msg)

#这样做失败
@periodic_task(name ='release_flag',run_every = timedelta(seconds = 10))
def release_flag():
with current_app.app_context():<< < #这一行的文件
books = Book.query.all()<<<<<< #Fails here too
for book in book:
book.read = True
book.save()

我使用celery beat命令来运行这个命令:

celery -A tasks worker -l INFO --beat



但是,我收到以下错误:

$ $ $ $ $ $ $> RuntimeError('working )
RuntimeError:在应用程序上下文之外工作

如果我删除current_app.app_context()行,我会得到以下错误:

  RuntimeError:应用程序未注册到数据库实例,没有绑定到当前上下文的应用程序

有没有一种特殊的方法可以访问用于芹菜任务的flask-sqlalchemy orm?还是会有一个更好的方法,我想要做什么?

到目前为止,唯一的解决方法是将 db.init_app(app)在我的应用程序工厂模式中:

db.app = app

我正在按照这个回购协议创建我的芹菜应用程序 https: //github.com/mattupstate/overholt/blob/master/overholt/factory.py

解决方案

因为 current_app 需要一个应用上下文才能工作,但是你正在尝试使用它来设置一个应用上下文。您需要使用实际的应用程序来设置上下文,然后您可以使用 current_app

 与app.app_context():
#做需要应用程序上下文的东西

或者您可以使用一种模式,如 Flask文档子类 celery.Task ,所以它默认了解应用程序上下文。

 来自芹菜导入Celery 

def make_celery(app):
celery = Celery(app.import_name,broker = app.config ['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
$ b $ class ContextTask(TaskBase):
abstract = True

def __call__ (self,* args,** kwargs):
with app.app_context():
返回TaskBase .__调用__(self,* args,** kwargs)

芹菜。 Task = ContextTask
返回芹菜

芹菜= make_celery(app)


I'm trying to flip a boolean flag for particular types of objects in my database using sqlalchemy+celery beats. But how do I access my orm from the tasks.py file?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()

I'm using celery beat command to run this:

celery -A tasks worker -l INFO --beat

But I'm getting the following error:

raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

Which points back to the with current_app.app_context() line

If I remove the current_app.app_context() line I will get the following error:

RuntimeError: application not registered on db instance and no application bound to current context

Is there a particular way to access the flask-sqlalchemy orm for celery tasks? Or would there be a better approach to what I'm trying to do?

So far the only workaround which works was to add the following line after db.init_app(app) in my application factory pattern:

db.app = app

I was following this repo to create my celery app https://github.com/mattupstate/overholt/blob/master/overholt/factory.py

解决方案

You're getting that error because current_app requires an app context to work, but you're trying to use it to set up an app context. You need to use the actual app to set up the context, then you can use current_app.

with app.app_context():
    # do stuff that requires the app context

Or you can use a pattern such as the one described in the Flask docs to subclass celery.Task so it knows about the app context by default.

from celery import Celery

def make_celery(app):
     celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
     celery.conf.update(app.config)
     TaskBase = celery.Task

     class ContextTask(TaskBase):
         abstract = True

         def __call__(self, *args, **kwargs):
             with app.app_context():
                 return TaskBase.__call__(self, *args, **kwargs)

     celery.Task = ContextTask
     return celery

 celery = make_celery(app)

这篇关于如何使用芹菜任务访问orm?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆