Celery / Django单个任务正在运行多次 [英] Celery / Django Single Tasks being run multiple times
问题描述
我正面临着一个问题,我把一个任务放在队列中,它正在运行多次。
从芹菜日志中我可以看到同一个工作人员正在运行任务...
[2014-06 -06 15:12:20,731:INFO / MainProcess]收到的任务:input.tasks.add_queue
[2014-06-06 15:12:20,750:INFO / Worker-2] [2014-06-06 15:12:20,759:INFO / Worker-2]收集开始
[2014-06-06 15:13:32,828:INFO / Worker-2]收集完成
[2014 -06-06 15:13:32,836:INFO / Worker-2]步骤完成
[2014-06-06 15:13:32,836:INFO / Worker-2]更新创建
[2014 -06-06 15:13:33,655:INFO / Worker-2]发送电子邮件
[2014-06-06 15:13:33,656:INFO / Worker-2]更新创建
[2014-06 -06 15:13:34,420:INFO / Worker-2]发送电子邮件
[2014-06-06 15:13:34,421:INFO / Worker-2]完成 - 成功
但是,当我查看应用程序的实际日志时,每个步骤显示5-6个日志行(??)。
我使用Django 1.6与RabbitMQ。放置到队列中的方法是通过在函数上放置一个延迟。
此函数(任务装饰器已添加(然后调用运行的类。有没有人有任何想法来解决这个问题?
编辑 :根据要求,代码中
views.py
在我看来,我通过...发送我的数据到队列...
from input.tasks import add_queue_project
add_queue_project.delay(data)
tasks.py p>
from celery.decorators import task
@task()
def add_queue_project(data):
运行项目
logger = logging_setup(app =project)
logger.info(starting project runner ..)
f = project_runner(data)
f.main()
class project_runner():
main project runner
def __i nit __(self,data):
self.data = data
self.logger = logging_setup(app =project)
def self.main(self):
....代码
settings.py
THIRD_PARTY_APPS =(
'south',#数据库迁移助手:
'crispy_forms',#表单布局
'rest_framework',
'djcelery',
)
import djcelery
djcelery.setup_loader()
BROKER_HOST =127.0。 0.1
BROKER_PORT = 5672#默认RabbitMQ侦听端口
BROKER_USER =test
BROKER_PASSWORD =test
BROKER_VHOST =test
CELERY_BACKEND =amqp #告诉Celery将结果报告回RabbitMQ
CELERY_RESULT_DBURI =
CELERY_IMPORTS =(input.tasks)
celeryd
行运行是开始芹菜,
python2.7 manage.py芹菜d -l info
谢谢,
我没有一个确切的答案,但有一些事情你应该研究:
-
djcelery
已被弃用,所以如果您使用新版本的芹菜
可能会有一些 -
如果您的
输入
应用程序列在INSTALLED_APPS
芹菜会发现它,所以你不需要将它添加到CELERY_IMPORTS =(input.tasks,)
,这可能是你的问题,因为任务可以多次加载 -
尝试给你的任务一个名字
@task(name ='input.tasks。添加')
,它会知道它是一样的任务,无论你如何导入它。
看着你的设置,看起来你正在使用一个旧版本的芹菜,或者你正在使用旧的配置新的芹菜。在任何情况下,请确保您具有最新版本,并尝试此配置,而不是您所拥有的:
BROKER_URL ='amqp:// < user>:< password> @localhost:5672 /< vhost>'
CELERY_RESULT_BACKEND ='amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER ='json'
CELERY_RESULT_SERIALIZER ='json'
现在,您还必须以不同的方式配置芹菜:
完全摆脱 djcelery
的东西。
创建<$您的django项目中的c $ c> proj / celery.py
<_ c $ c> from __future__ import absolute_import
import os
从芹菜进口Celery
从django.conf导入设置
#设置默认的Django设置模块为芹菜计划。
os.environ.setdefault('DJANGO_SETTINGS_MODULE','settings')
app = Celery('proj')
#使用Windows时不需要
#pickle该对象。
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)
@ app.task(bind = True)
def debug_task(self):
print('Request:{0!r}'。format(self.request))
在您的 proj / __ init __。py
中:
从__future__导入absolute_import
从proj.celery导入应用程序为celery_app
然后,如果您的输入
应用程序是可重复使用的应用程序,而不是您的项目的一部分,则使用 @shared_task
而不是
然后运行芹菜:
芹菜-A proj worker -l info
希望它有帮助。
I'm facing an issue where I'm placing a task into the queue and it is being run multiple times. From the celery logs I can see that the same worker is running the task ...
[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success
However when I view the actual logs of the application it is showing 5-6 log lines for each step (??).
Im using Django 1.6 with RabbitMQ. The method for placing into the queue is via placing a delay on a function.
This function (task decorator is added( then calls a class which is run.
Has anyone any idea on the best way to troubleshoot this ?
Edit : As requested heres the code,
views.py
In my view im sending my data to the queue via ...
from input.tasks import add_queue_project
add_queue_project.delay(data)
tasks.py
from celery.decorators import task
@task()
def add_queue_project(data):
""" run project """
logger = logging_setup(app="project")
logger.info("starting project runner..")
f = project_runner(data)
f.main()
class project_runner():
""" main project runner """
def __init__(self,data):
self.data = data
self.logger = logging_setup(app="project")
def self.main(self):
.... Code
settings.py
THIRD_PARTY_APPS = (
'south', # Database migration helpers:
'crispy_forms', # Form layouts
'rest_framework',
'djcelery',
)
import djcelery
djcelery.setup_loader()
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""
CELERY_IMPORTS = ("input.tasks", )
celeryd
The line im running is to start celery,
python2.7 manage.py celeryd -l info
Thanks,
I don't have an exact answer for you, but there are a few things you should look into:
djcelery
is deprecated, so if you are using new version ofcelery
there may be some sort of conflict.If your
input
app is listed inINSTALLED_APPS
celery will discover it, so you don't need to add it toCELERY_IMPORTS = ("input.tasks", )
, which maybe the cause of your problem, since tasks could be loaded multiple timestry giving your task a name
@task(name='input.tasks.add')
, it will know that it is the same task, no matter how you import it.
Looking at your setting it looks like you are using an old version of celery, or you are using you old configuration for new version of celery. In any case make sure you have newest version and try this configuration instead of what you have:
BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Now, you also will have to configure celery differently:
Get rid of djcelery
stuff completely.
Create proj/celery.py
inside your django project:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('proj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
In your proj/__init__.py
:
from __future__ import absolute_import
from proj.celery import app as celery_app
Then if your input
app is a reusable app and is not part of your project use @shared_task
instead of @task
decorator.
Then run celery:
celery -A proj worker -l info
Hope it helps.
这篇关于Celery / Django单个任务正在运行多次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!