Celery-在task.py中导入模型 [英] Celery - importing models in tasks.py

查看:81
本文介绍了Celery-在task.py中导入模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在访问我的task.py中的模型时遇到问题

I'm having an issue getting access to models in my tasks.py

我的目标是在应用程序的各个部分(用户注册,重置密码等)发送电子邮件.为此,我将用户ID传递给一个名为"send_email"的芹菜任务.

My goal is to send an email at various parts of the application (user registration, reset password etc..). To do this I pass the user id(s) to a celery task called 'send_email'.

@shared_task()
def send_email(sender_id=None, receiver_id=None, type=None, message=None):

    sender = User.objects.get(id=sender_id)
    receiver = User.objects.get(id=receiver_id)

    logger.info("Starting send email")
    Email.send_email(sender, receiver, type, message)
    logger.info("Finished send email")

然后,任务需要使用ID来检索用户并向他们发送电子邮件.尝试将用户模型导入task.py文件时,这种方法会崩溃.

The task then needs to use the id to retrieve the user and send them an email. This breaks down when trying to import the User model into the tasks.py file.

我收到一个错误

raise AppRegistryNotReady("Apps aren't loaded yet.") django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

我尝试过的事情

  1. 在task.py文件顶部调用django.setup()-原因

  1. calling django.setup() at the top of tasks.py file - causes

raise RuntimeError("populate() isn't reentrant") 

放入send_email方法时也会引起相同的错误.这些是关于SO中其他类似问题的建议

causes the same error when put in send_email method as well. These were suggestions on other similar questions in SO

以"send_email"方法导入模型,允许工作程序启动,但会导致以下错误

Importing model in 'send_email' method, allows the worker to start but causes the following error

raise AppRegistryNotReady("Apps aren't loaded yet.") 

这是关于SO中类似问题的另一条建议

This was another suggestion on a similar question in SO

在调用有效的"send_email"函数时删除.delay(在tasks.py文件顶部或send_email方法中使用导入),但是由于该任务不再异步,因此没有任何好处,但可能会缩小解决问题?

Removing .delay when calling the 'send_email' function which works (with imports at top of tasks.py file or in send_email method) but as the task is no longer async it is not of any benefit but perhaps narrows down the issue?

注意事项?

  1. 我使用了扩展AbstractBaseUser的自定义用户模型,我在celery中看到了许多与此相关的github问题,但我认为这些问题应该在celery v3.1中得到解决
  2. 我使用的是celery v4.1,django 1.11.10,python 2.7,并且正在使用RabbitMQ作为Broker,并在虚拟环境中运行worker/server.我正在使用

  1. I use a custom user model that extends AbstractBaseUser, I have seen a number of github issues in celery relating to this but these were meant to be fixed in celery v3.1 or so I believe
  2. I'm on celery v4.1, django 1.11.10, python 2.7 and am using RabbitMQ as Broker and running worker / server on a virtual env. I'm starting my worker using

celery -A api worker -l info

在终端窗口上,然后使用pycharm的终端通过

on a terminal window and then using pycharm's terminal to start server using

python manage.py runserver

那么实际上有2个环境吗?这可能是问题吗?

so effectively there are 2 envs? Could this be the issue?

这可能相关,也可能不相关,但是为了使自定义用户模型在我的app/models.py中起作用,我只有一行输入了User模型,否则我会得到

This may or not be related but in order to get my custom user model to work in my app/models.py I just have a single line that imports the User model otherwise I get a

django.core.exceptions.ImproperlyConfigured: AUTH_USER_MODEL refers to model 'myApi.User' that has not been installed

  • 我尝试将auth模型设置为'myApi.user.User'(用户是声明模型的文件夹,但得到

  • I tried to set the auth model to 'myApi.user.User' (user being the folder where model is declared but get a

    Invalid model reference 'myApi.user.User'. String model references must be of the form 'app_label.ModelName'.
    

    所以我猜这就是为什么需要在myApi/models.py中导入,以便可以在这里提取?

    so I'm guessing this is why the import is needed in myApi/models.py so that it can be picked up here?

    项目结构

    ├── api
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── celerySettings.py # my celery.py
    ├── db.sqlite3
    ├── myApi
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── tasks.py
    │   ├── urls.py
    │   ├── user
    │   │   ├── __init__.py
    │   │   ├── managers.py
    │   │   ├── models.py
    │   │   ├── serializers.py
    │   │   ├── urls.py
    │   │   └── views.py
    │   ├── utils
    │   │   └── Email.py
    │   ├── views.py
    ├── manage.py
    └── static
    

    tasks.py

    from __future__ import absolute_import, unicode_literals
    from celery.schedules import crontab
    from celery.task import periodic_task
    from celery.utils.log import get_task_logger
    from celery import shared_task
    
    from celery import current_app
    
    from .user.models import User
    from .utils import Email
    
    logger = get_task_logger(__name__)
    
    @shared_task()
    def send_email(sender_id=None, receiver_id=None, type=None, message=None):
    
        sender = User.objects.get(id=sender_id)
        receiver = User.objects.get(id=receiver_id)
    
        logger.info("Starting send email")
        Email.send_email(sender, receiver, type, message)
        logger.info("Finished send email")
    

    settings.py

    ....
    INSTALLED_APPS = [
        'rest_framework',
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'corsheaders',
        'myApi',
        'celery',
        'rest_framework.authtoken',
        'rest_framework.renderers',
    ]
    
    AUTH_USER_MODEL = 'myApi.User'
    CELERY_IMPORTS = ('api.myApi.tasks')
    ....
    

    celerySettings.py

    from __future__ import absolute_import, unicode_literals
    from django.conf import settings
    import os
    from celery import Celery
    
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.api.settings')
    
    app = Celery('api', broker='amqp://')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object(settings, namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    myApi/models.py

    from user.models import User
    

    myApi/admin.py

    # -*- coding: utf-8 -*-
    from __future__ import unicode_literals
    
    from django.contrib import admin
    
    from user.models import User
    
    admin.site.register(User)
    

    api/wsgi.py

    import os
    
    from django.core.wsgi import get_wsgi_application
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.settings")
    
    application = get_wsgi_application()
    

    任何建议将不胜感激.同样很长的帖子也很抱歉,这是我的第一篇,所以不确定需要多少细节.

    Any suggestions would be greatly appreciated. Also sorry for the long post, it is my first one so wasn't sure how much detail was needed.

    推荐答案

    我发现了我的问题.万一它可以帮助其他人解决这个问题,我需要添加行

    I found my issue. In case it helps anyone else stuck on this I needed to add the line

    sys.path.append(os.path.abspath('api'))
    

    在我的celerySettings.py中获取要提取的模型.

    in my celerySettings.py for models to be picked up.

    所以现在看起来像这样

    from __future__ import absolute_import, unicode_literals
    from django.conf import settings
    import os, sys
    from celery import Celery
    
    sys.path.append(os.path.abspath('api'))
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.api.settings')
    
    app = Celery('api', broker='amqp://')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object(settings, namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    然后,当我尝试在本地查询数据库中的模型时,我遇到了另一个问题.Celery说我的数据库表不存在,因为这是在实际本地数据库文件的上方一个文件夹中创建了一个新数据库,要修复它,我只需要更改数据库名称

    I then ran into another issue when actually trying to query the Database for my models locally. Celery was saying that my database tables did not exists, this was because it was creating a new database one folder above where the actual local database file was, to fix it I just had to change database name

    "db.sqlite3"
    

    os.path.join(os.path.dirname(__file__), "db.sqlite3")
    

    在settings.py

    in settings.py

    有效地将其更改为

    api/db.sqlite3
    

    芹菜

    希望这对其他人有所帮助,因为我花了太多时间来解决这个问题.

    Hopefully this helps someone else as I spent far too much time struggling this issue.

    这篇关于Celery-在task.py中导入模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆