Celery / Django单个任务正在运行多次 [英] Celery / Django Single Tasks being run multiple times

查看:481
本文介绍了Celery / Django单个任务正在运行多次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正面临着一个问题,我把一个任务放在队列中,它正在运行多次。
从芹菜日志中我可以看到同一个工作人员正在运行任务...

  [2014-06 -06 15:12:20,731:INFO / MainProcess]收到的任务:input.tasks.add_queue 
[2014-06-06 15:12:20,750:INFO / Worker-2] [2014-06-06 15:12:20,759:INFO / Worker-2]收集开始
[2014-06-06 15:13:32,828:INFO / Worker-2]收集完成
[2014 -06-06 15:13:32,836:INFO / Worker-2]步骤完成
[2014-06-06 15:13:32,836:INFO / Worker-2]更新创建
[2014 -06-06 15:13:33,655:INFO / Worker-2]发送电子邮件
[2014-06-06 15:13:33,656:INFO / Worker-2]更新创建
[2014-06 -06 15:13:34,420:INFO / Worker-2]发送电子邮件
[2014-06-06 15:13:34,421:INFO / Worker-2]完成 - 成功

但是,当我查看应用程序的实际日志时,每个步骤显示5-6个日志行(??)。



我使用Django 1.6与RabbitMQ。放置到队列中的方法是通过在函数上放置一个延迟。



此函数(任务装饰器已添加(然后调用运行的类。有没有人有任何想法来解决这个问题?



编辑 :根据要求,代码中



views.py



在我看来,我通过...发送我的数据到队列...

  from input.tasks import add_queue_project 

add_queue_project.delay(data)

tasks.py p>

  from celery.decorators import task 

@task()
def add_queue_project(data):
运行项目
logger = logging_setup(app =project)

logger.info(starting project runner ..)
f = project_runner(data)
f.main()

class project_runner():
main project runner

def __i nit __(self,data):
self.data = data
self.logger = logging_setup(app =project)

def self.main(self):
....代码

settings.py

  THIRD_PARTY_APPS =(
'south',#数据库迁移助手:
'crispy_forms',#表单布局
'rest_framework',
'djcelery',


import djcelery
djcelery.setup_loader()

BROKER_HOST =127.0。 0.1
BROKER_PORT = 5672#默认RabbitMQ侦听端口
BROKER_USER =test
BROKER_PASSWORD =test
BROKER_VHOST =test
CELERY_BACKEND =amqp #告诉Celery将结果报告回RabbitMQ
CELERY_RESULT_DBURI =

CELERY_IMPORTS =(input.tasks)

celeryd



行运行是开始芹菜,

  python2.7 manage.py芹菜d -l info 

谢谢,

解决方案

我没有一个确切的答案,但有一些事情你应该研究:




  • djcelery 已被弃用,所以如果您使用新版本的芹菜可能会有一些


  • 如果您的输入应用程序列在 INSTALLED_APPS 芹菜会发现它,所以你不需要将它添加到 CELERY_IMPORTS =(input.tasks,),这可能是你的问题,因为任务可以多次加载


  • 尝试给你的任务一个名字 @task(name ='input.tasks。添加'),它会知道它是一样的任务,无论你如何导入它。




看着你的设置,看起来你正在使用一个旧版本的芹菜,或者你正在使用旧的配置新的芹菜。在任何情况下,请确保您具有最新版本,并尝试此配置,而不是您所拥有的:

  BROKER_URL ='amqp:// < user>:< password> @localhost:5672 /< vhost>'
CELERY_RESULT_BACKEND ='amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER ='json'
CELERY_RESULT_SERIALIZER ='json'

现在,您还必须以不同的方式配置芹菜:



完全摆脱 djcelery 的东西。



创建<$您的django项目中的c $ c> proj / celery.py

 <_ c $ c> from __future__ import absolute_import 

import os

从芹菜进口Celery

从django.conf导入设置

#设置默认的Django设置模块为芹菜计划。
os.environ.setdefault('DJANGO_SETTINGS_MODULE','settings')

app = Celery('proj')

#使用Windows时不需要
#pickle该对象。
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

@ app.task(bind = True)
def debug_task(self):
print('Request:{0!r}'。format(self.request))

在您的 proj / __ init __。py 中:

 从__future__导入absolute_import 

从proj.celery导入应用程序为celery_app

然后,如果您的输入应用程序是可重复使用的应用程序,而不是您的项目的一部分,则使用 @shared_task 而不是



然后运行芹菜:

 芹菜-A proj worker -l info 

希望它有帮助。


I'm facing an issue where I'm placing a task into the queue and it is being run multiple times. From the celery logs I can see that the same worker is running the task ...

[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success

However when I view the actual logs of the application it is showing 5-6 log lines for each step (??).

Im using Django 1.6 with RabbitMQ. The method for placing into the queue is via placing a delay on a function.

This function (task decorator is added( then calls a class which is run.

Has anyone any idea on the best way to troubleshoot this ?

Edit : As requested heres the code,

views.py

In my view im sending my data to the queue via ...

from input.tasks import add_queue_project

add_queue_project.delay(data)

tasks.py

from celery.decorators import task

@task()
def add_queue_project(data):
    """ run project """
    logger = logging_setup(app="project")

    logger.info("starting project runner..")
    f = project_runner(data)
    f.main()

class project_runner():
    """ main project runner """

    def __init__(self,data):
        self.data = data
        self.logger = logging_setup(app="project")

    def self.main(self):
        .... Code

settings.py

THIRD_PARTY_APPS = (
    'south',  # Database migration helpers:
    'crispy_forms',  # Form layouts
    'rest_framework',
    'djcelery',
)

import djcelery
djcelery.setup_loader()

BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""

CELERY_IMPORTS = ("input.tasks", )

celeryd

The line im running is to start celery,

python2.7 manage.py celeryd -l info

Thanks,

解决方案

I don't have an exact answer for you, but there are a few things you should look into:

  • djcelery is deprecated, so if you are using new version of celery there may be some sort of conflict.

  • If your input app is listed in INSTALLED_APPS celery will discover it, so you don't need to add it to CELERY_IMPORTS = ("input.tasks", ), which maybe the cause of your problem, since tasks could be loaded multiple times

  • try giving your task a name @task(name='input.tasks.add'), it will know that it is the same task, no matter how you import it.

Looking at your setting it looks like you are using an old version of celery, or you are using you old configuration for new version of celery. In any case make sure you have newest version and try this configuration instead of what you have:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Now, you also will have to configure celery differently:

Get rid of djcelery stuff completely.

Create proj/celery.py inside your django project:

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

In your proj/__init__.py:

from __future__ import absolute_import

from proj.celery import app as celery_app

Then if your input app is a reusable app and is not part of your project use @shared_task instead of @task decorator.

Then run celery:

celery -A proj worker -l info

Hope it helps.

这篇关于Celery / Django单个任务正在运行多次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆