将芹菜任务路由到特定队列 [英] Route celery task to specific queue

查看:61
本文介绍了将芹菜任务路由到特定队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的服务器上运行着两个独立的celeryd进程,由 supervisor 管理.它们被设置为在单独的队列上侦听,例如:

I have two separate celeryd processes running on my server, managed by supervisor. They are set to listen on separate queues as such:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

我的celeryconfig看起来像这样:

And my celeryconfig looks something like this:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

所有 tasks.sync 任务必须路由到特定队列(因此​​进展缓慢).但是,当我尝试使用 sync.apply_async(kwargs = {'client':'value'},queue ='queue1')手动运行任务时,两个芹菜工人都接过任务.如何使任务路由到正确的队列,并且仅由绑定到该队列的工作程序运行?

All tasks.sync tasks must be routed to a specific queue (and therefore celeryd progress). But when I try to run the task manually with sync.apply_async(kwargs={'client': 'value'}, queue='queue1') both celery workers pick up the task. How can I make the task route to the correct queue and only be run by the worker that is bound to the queue?

推荐答案

您只在运行一个celerybeat实例吗?

You are only running one celerybeat instance right?

也许您有与此冲突的旧队列绑定?尝试运行 rabbitmqctl list_queues rabbitmqctl list_bindings ,也许重置代理中的数据以从头开始.

Maybe you have old queue bindings that clash with this? Try running rabbitmqctl list_queues and rabbitmqctl list_bindings, maybe reset the data in the broker to start from scratch.

您在此使用的示例应该可以使用,并且在我刚刚尝试过时对我也适用.

The example you have here should work, and is working for me when I just tried it.

提示:由于您使用与队列名称相同的exchange和binding_key值,您无需在CELERY_QUEUES中明确列出它们.当CELERY_CREATE_MISSING_QUEUES开启(默认情况下),队列将完全按照您的方式自动创建如果您只是执行 celeryd -Q queue1 或将任务发送到未定义的队列中.

Tip: Since you are using the same exchange and binding_key value as the queue name, you don't have to explicitly list them in CELERY_QUEUES. When CELERY_CREATE_MISSING_QUEUES is on (which it is by default) the queues will be automatically created exactly like you have if you just do celeryd -Q queue1 or send a task to a queue that is undefined.

这篇关于将芹菜任务路由到特定队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆