Django&芹菜 - 路由问题 [英] Django & Celery — Routing problems

查看:122
本文介绍了Django&芹菜 - 路由问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Django和Celery,我正在尝试将路由设置为多个队列。当我指定任务的 routing_key exchange (在任务装饰器中或使用 apply_async )),任务不会添加到代理(这是Kombu连接到我的MySQL数据库)。

I'm using Django and Celery and I'm trying to setup routing to multiple queues. When I specify a task's routing_key and exchange (either in the task decorator or using apply_async()), the task isn't added to the broker (which is Kombu connecting to my MySQL database).

如果我指定队列任务装饰器中的名称(这将意味着路由键被忽略),任务工作正常。这似乎是路由/交换设置的一个问题。

If I specify the queue name in the task decorator (which will mean the routing key is ignored), the task works fine. It appears to be a problem with the routing/exchange setup.

任何想法可能是什么问题?

Any idea what the problem could be?

这是设置:

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

启动任务:

from tasks import my_important_task
my_important_task.delay()


推荐答案

您正在使用Django ORM作为代理,这意味着声明只存储在内存中b $ b(请参阅 http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison

You are using the Django ORM as a broker, which means declarations are only stored in memory (see the, inarguably hard to find, transport comparison table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)

所以当你应用这个任务with route_key important_task.update 它将不能
路由它,因为它还没有声明队列。

So when you apply this task with routing_key important_task.update it will not be able to route it, because it hasn't declared the queue yet.

如果你这样做,它将工作:

It will work if you do this:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")






但是会使用自动路由功能更简单,
,因为这里没有任何显示您需要使用主题交换,
使用自动路由只需删除设置


  • CELERY_DEFAULT_QUEUE

  • code> CELERY_DEFAULT_EXCHANGE ,

  • CELERY_DEFAULT_EXCHANGE_TYPE

  • CELERY_DEFAULT_ROUTING_KEY

  • CELERY_QUEUES

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

并声明您的任务如下:

@task(queue="important")
def important_task():
    return "IMPORTANT"

然后启动从该队列消耗的工作人员:

and then to start a worker consuming from that queue:

$ python manage.py celeryd -l info -Q important

或从默认(芹菜)队列中消费和重要队列:

or to consume from both the default (celery) queue and the important queue:

$ python manage.py celeryd -l info -Q celery,important






另一个好的做法是不要将队列名称硬编码到
任务中,而是使用 CELERY_ROUTES

@task
def important_task():
    return "DEFAULT"

然后在您的设置中:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}






如果您仍然坚持使用主题交换,那么你可以
添加这个路由器自动声明所有队列第一次
a任务发送:


If you still insist on using topic exchanges then you could add this router to automatically declare all queues the first time a task is sent:

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
            else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )

这篇关于Django&芹菜 - 路由问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆