将芹菜任务路由到特定队列 [英] Route celery task to specific queue
问题描述
我的服务器上运行着两个独立的celeryd进程,由 supervisor
管理.它们被设置为在单独的队列上侦听,例如:
I have two separate celeryd processes running on my server, managed by supervisor
. They are set to listen on separate queues as such:
[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...
[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...
我的celeryconfig看起来像这样:
And my celeryconfig looks something like this:
from celery.schedules import crontab
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
'default': {
"exchange": "default",
"binding_key": "default",
},
'queue1': {
'exchange': 'queue1',
'routing_key': 'queue1',
},
'queue2': {
'exchange': 'queue2',
'routing_key': 'queue2',
},
}
CELERY_IMPORTS = ('tasks', )
CELERYBEAT_SCHEDULE = {
'first-queue': {
'task': 'tasks.sync',
'schedule': crontab(hour=02, minute=00),
'kwargs': {'client': 'client_1'},
'options': {'queue': 'queue1'},
},
'second-queue': {
'task': 'tasks.sync',
'schedule': crontab(hour=02, minute=00),
'kwargs': {'client': 'client_2'},
'options': {'queue': 'queue1'},
},
}
所有 tasks.sync
任务必须路由到特定队列(因此进展缓慢).但是,当我尝试使用 sync.apply_async(kwargs = {'client':'value'},queue ='queue1')
手动运行任务时,两个芹菜工人都接过任务.如何使任务路由到正确的队列,并且仅由绑定到该队列的工作程序运行?
All tasks.sync
tasks must be routed to a specific queue (and therefore celeryd progress). But when I try to run the task manually with sync.apply_async(kwargs={'client': 'value'}, queue='queue1')
both celery workers pick up the task. How can I make the task route to the correct queue and only be run by the worker that is bound to the queue?
推荐答案
您只在运行一个celerybeat实例吗?
You are only running one celerybeat instance right?
也许您有与此冲突的旧队列绑定?尝试运行 rabbitmqctl list_queues
和 rabbitmqctl list_bindings
,也许重置代理中的数据以从头开始.
Maybe you have old queue bindings that clash with this?
Try running rabbitmqctl list_queues
and rabbitmqctl list_bindings
,
maybe reset the data in the broker to start from scratch.
您在此使用的示例应该可以使用,并且在我刚刚尝试过时对我也适用.
The example you have here should work, and is working for me when I just tried it.
提示:由于您使用与队列名称相同的exchange和binding_key值,您无需在CELERY_QUEUES中明确列出它们.当CELERY_CREATE_MISSING_QUEUES开启(默认情况下),队列将完全按照您的方式自动创建如果您只是执行 celeryd -Q queue1
或将任务发送到未定义的队列中.
Tip: Since you are using the same exchange and binding_key value as the queue name,
you don't have to explicitly list them in CELERY_QUEUES. When CELERY_CREATE_MISSING_QUEUES
is on (which it is by default) the queues will be automatically created exactly like you have
if you just do celeryd -Q queue1
or send a task to a queue that is undefined.
这篇关于将芹菜任务路由到特定队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!