芹菜不注册任务 [英] Celery does not registering tasks
问题描述
您好!我刚开始在Django中使用Celery。我需要定期执行一项任务。在管理界面中,我可以在名为任务(已注册):的下拉列表中看到我的任务。但是当Celery Beat尝试执行它时,会抛出NotRegistered异常。
Hello! I just started to use Celery with Django. I've a task that i need to be periodic. In admin interface I can see my task in dropdown list named "Task (registered):". But when Celery Beat tries to execute it NotRegistered exception is thrown.
Python 3.5.2,Django 1.11.4,Celery 4.1,django-celery-beat 1.1.0, django-celery-results 1.0.1
Python 3.5.2, Django 1.11.4, Celery 4.1, django-celery-beat 1.1.0, django-celery-results 1.0.1
与芹菜相关的settings.py部分:
Part of settings.py related to celery:
CELERY_BROKER_URL = 'amqp://user:*****@192.168.X.X/proj'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Moscow'
celery.py和proj / __ init__.py与文档示例相同。
celery.py and proj/__init__.py are identical to documentation examples.
proj / celery.py:
proj/celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
proj / __ init__。 py:
proj/__init__.py:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
tasks.py:
from celery import shared_task
from core.backend.files import SFTP
@shared_task
def load_files_from_sftp():
ftp = SFTP()
ftp.get_files()
我得到了以下json结果:
{ exc_message:'core.tasks.load_files_from_sftp', exc_type: NotRegistered}
I get the following json result: {"exc_message": "'core.tasks.load_files_from_sftp'", "exc_type": "NotRegistered"}
如果我尝试从shell使用celery.task.control.inspect(),那里只有debug_task()。只是卡住了! (感谢您的帮助。
If I try to use celery.task.control.inspect() from shell, it is only debug_task() there. Just stuck! ( I'll be grateful for any help.
推荐答案
正如@MuhammadShoaib所写,这只是一件事:
As @MuhammadShoaib wrote, it's just one thing:
from django.apps import apps
app.config_from_object(settings)
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])
而不是
app.autodiscover_tasks()
为什么这不在文档中?...
why this is not in the documentation?...
这篇关于芹菜不注册任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!