Django上的Django / Celery多个队列-路由不起作用 [英] Django/Celery multiple queues on localhost - routing not working

查看:404
本文介绍了Django上的Django / Celery多个队列-路由不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我关注了celery docs 来定义2个队列

I followed celery docs to define 2 queues on my dev machine.

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在我的项目的virtualenv中打开了两个终端窗口,并运行以下命令:

i opened two terminal windows, in virtualenv of my project, and ran following commands:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是所有任务都由两个队列处理。

what i get is that all tasks are being processed by both queues.

我的目标是让一个队列仅处理中定义的一项任务CELERY_ROUTES 和默认队列以处理所有其他任务。

My goal is to have one queue to process only the one task defined in CELERY_ROUTES and default queue to process all other tasks.

我还遵循了 SO问题 rabbitmqctl list_queues 返回 celery 0 ,并运行 rabbitmqctl list_bindings 返回 exchange celery queue celery [] 两次。重新启动Rabbit服务器并没有什么改变。

I also followed this SO question, rabbitmqctl list_queues returns celery 0, and running rabbitmqctl list_bindings returns exchange celery queue celery [] twice. Restarting rabbit server didn't change anything.

推荐答案

好,所以我知道了。对于那些可能想知道与我的问题相同的事情的人,以下是我的整个设置,设置以及如何运行celery。

Ok, so i figured it out. Following is my whole setup, settings and how to run celery, for those who might be wondering about same thing as my question did.

设置

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

如何运行芹菜?

终端-选项卡1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动使用默认队列中任务的第一个工作程序。注意! -n default_worker 不是第一个工作者的必需项,但是如果您有任何其他celery实例正在运行,则必须这样做。设置 -n worker_name -hostname = default @%h 相同。

this will start first worker that consumes tasks from default queue. NOTE! -n default_worker is not a must for the first worker, but is a must if you have any other celery instances up and running. Setting -n worker_name is the same as --hostname=default@%h.

终端-选项卡2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动第二个工作,消费者从feeds队列中执行任务。注意 -n feeds_worker ,如果您运行的是 -l debug (日志级别= debug),则将看到

this will start second worker that consumers tasks from feeds queue. Notice -n feeds_worker, if you are running with -l debug (log level = debug), you will see that both workers are syncing between them.

终端-标签3:

celery -A proj beat -l debug

这将开始拍子,根据您的 CELERYBEAT_SCHEDULE 中的计划执行任务。
我不必更改任务,也不必更改 CELERYBEAT_SCHEDULE

this will start the beat, executing tasks according to the schedule in your CELERYBEAT_SCHEDULE. I didn't have to change the task, or the CELERYBEAT_SCHEDULE.

例如,我的 CELERYBEAT_SCHEDULE 是应该去提要队列的任务的样子:

For example, this is how looks my CELERYBEAT_SCHEDULE for the task that should go to feeds queue:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

如您所见,无需添加'options':{'routing_key':'long_tasks'} 或指定将其放入哪个队列。另外,如果您想知道为什么 Update 是大写的,那是因为它是一个自定义任务,定义为 celery.Task 的子类。 code>。

As you can see, no need for adding 'options': {'routing_key': 'long_tasks'} or specifying to what queue it should go. Also, if you were wondering why Update is upper cased, its because its a custom task, which are defined as sub classes of celery.Task.

这篇关于Django上的Django / Celery多个队列-路由不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆