Python,Celery,Flask,“在应用程序上下文之外工作" [英] Python, Celery, Flask, "working outside of application context"

查看:64
本文介绍了Python,Celery,Flask,“在应用程序上下文之外工作"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Celery和Python为Flask应用程序安排任务.我基本上想每隔x倍的时间在另一个目录中运行一个函数,并使其成为一项芹菜任务.我导入了函数test_check,然后尝试将其放在一个名为testcheck()的芹菜任务下,但是,我得到了错误:

I am trying to schedule tasks using Celery and Python for a Flask app. I basically want to run a function in another directory every x amount of time and make it a celery task. I import the function test_check and I try and put it under a celery task called testcheck(), however, I get the error:

在应用程序上下文之外工作

working outside of application context

我该如何解决?这是我的设置:

How can I fix this? Here is my setup:

from app import app
from celery import Celery
from datetime import timedelta
from app.mod_check.views import test_check

celery = Celery(__name__,
             broker='amqp://guest:@localhost/',
             backend='amqp://guest:@localhost/'
             )

celery.config_from_object(__name__)

@celery.task
def add(x, y):
    print "celery working!"
    return x + y

@celery.task
def testcheck():
        test_check()

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks2.testcheck',
        'schedule': timedelta(seconds=5),
        #'args': (16, 16)
    },
}

CELERY_TIMEZONE = 'Europe/London'

推荐答案

无论 test_check 是什么,它都会执行需要请求上下文的操作.由于Celery任务不是HTTP请求/响应周期的一部分,因此您需要手动设置请求上下文.

Whatever test_check is, it does something that needs a request context. Since Celery tasks are not part of the HTTP request/response cycle, you need to set up a request context manually.

with app.test_request_context():
    test_check()

这篇关于Python,Celery,Flask,“在应用程序上下文之外工作"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆