如何使用Celery和Django将任务路由到不同的队列 [英] How to route tasks to different queues with Celery and Django

查看:307
本文介绍了如何使用Celery和Django将任务路由到不同的队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用以下堆栈:


  • Python 3.6

  • Celery v4.2.1 (经纪人: RabbitMQ v3.6.0

  • Django v2.0.4

  • Python 3.6
  • Celery v4.2.1 (Broker: RabbitMQ v3.6.0)
  • Django v2.0.4.

根据 Celery的文档,在不同的队列上运行计划的任务应该很容易,就像在 CELERY_ROUTES 上为任务定义相应的队列一样,尽管如此,所有任务似乎都在Celery的计算机上执行默认队列。

According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.

这是 my_app / settings.py 上的配置:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}

这些任务只是用于测试路由的简单脚本:

The tasks are just simple scripts for testing routing:

文件 app1 / tasks.py

from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

文件 app2 / tasks.py

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

当我使用所有必需的队列运行Celery时:

When I run Celery with all the required queues:

celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ将显示只有默认队列 celery 正在运行任务:

RabbitMQ will show that only the default queue "celery" is running the tasks:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

有人知道如何解决此意外行为吗?

Does somebody know how to fix this unexpected behavior?

致谢

推荐答案

我已经开始使用了,这里没有什么要注意的:

I have got it working, there are few things to note here:

根据 Celery 4.2.0文档 CELER Y_ROUTES 应该是用于定义队列路由的变量,但是它仅对我来说使用 CELERY_TASK_ROUTES 起作用。任务路由似乎独立于Celery Beat,因此仅适用于手动计划的任务:

According Celery's 4.2.0 documentation, CELERY_ROUTES should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:

app1_test.delay()
app2_test.delay()

app1_test.apply_async()
app2_test.apply_async()

要使其与Celery Beat一起使用,我们只需要在 CELERY_BEAT_SCHEDULE 变量中显式定义队列。文件 my_app / settings.py 的最终设置如下:

To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE variable. The final setup of the file my_app/settings.py would be as follows:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

我希望这可以节省一些时间给其他开发人员。

I hope this saves some time to other developers.

这篇关于如何使用Celery和Django将任务路由到不同的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆