芹菜不注册任务 [英] Celery does not registering tasks

查看:212
本文介绍了芹菜不注册任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


您好!我刚开始在Django中使用Celery。我需要定期执行一项任务。在管理界面中,我可以在名为任务(已注册):的下拉列表中看到我的任务。但是当Celery Beat尝试执行它时,会抛出NotRegistered异常。

Hello! I just started to use Celery with Django. I've a task that i need to be periodic. In admin interface I can see my task in dropdown list named "Task (registered):". But when Celery Beat tries to execute it NotRegistered exception is thrown.

Python 3.5.2,Django 1.11.4,Celery 4.1,django-celery-beat 1.1.0, django-celery-results 1.0.1

Python 3.5.2, Django 1.11.4, Celery 4.1, django-celery-beat 1.1.0, django-celery-results 1.0.1


与芹菜相关的settings.py部分:

Part of settings.py related to celery:

CELERY_BROKER_URL = 'amqp://user:*****@192.168.X.X/proj'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Moscow'

celery.py和proj / __ init__.py与文档示例相同。

celery.py and proj/__init__.py are identical to documentation examples.

proj / celery.py:

proj/celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

app.config_from_object(settings, namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

proj / __ init__。 py:

proj/__init__.py:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

tasks.py:

from celery import shared_task
from core.backend.files import SFTP

@shared_task
def load_files_from_sftp():
    ftp = SFTP()
    ftp.get_files()

我得到了以下json结果:
{ exc_message:'core.tasks.load_files_from_sftp', exc_type: NotRegistered}

I get the following json result: {"exc_message": "'core.tasks.load_files_from_sftp'", "exc_type": "NotRegistered"}

如果我尝试从shell使用celery.task.control.inspect(),那里只有debug_task()。只是卡住了! (感谢您的帮助。

If I try to use celery.task.control.inspect() from shell, it is only debug_task() there. Just stuck! ( I'll be grateful for any help.

推荐答案

正如@MuhammadShoaib所写,这只是一件事:

As @MuhammadShoaib wrote, it's just one thing:

from django.apps import apps 

app.config_from_object(settings)
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

而不是

app.autodiscover_tasks()

为什么这不在文档中?...

why this is not in the documentation?...

这篇关于芹菜不注册任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆