如何在运行时创建 celery 队列,以便发送到该队列的任务被工作人员拾取? [英] How do I create celery queues on runtime so that tasks sent to that queue gets picked up by workers?

查看:16
本文介绍了如何在运行时创建 celery 队列,以便发送到该队列的任务被工作人员拾取?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 django 1.4、celery 3.0、rabbitmq

I'm using django 1.4, celery 3.0, rabbitmq

为了描述这个问题,我在一个系统中有许多内容网络,我想要一个队列来处理与每个网络相关的任务.

To describe the problem, I have many content networks in a system and I want a queue for processing tasks related to each of these network.

但是,当系统运行时,内容是动态创建的,因此我需要动态创建队列并让现有工作人员开始处理它们.

However content is created on the fly when the system is live and therefore I need to create queues on the fly and have existing workers start picking up on them.

我已经尝试通过以下方式调度任务(其中内容是 django 模型实例):

I've tried scheduling tasks in the following way (where content is a django model instance):

queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)

这将创建一个名为 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 的队列,并创建一个名为 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 的新交换code> 和路由键 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 并将任务发送到该队列.

This create a queue with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba, creates a new exchange with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and routing key content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and sends a task to that queue.

但是我从来没有看到工人接手这些任务.我当前设置的工人没有监听任何特定的队列(未使用队列名称初始化)并接手发送到默认队列就好了.我的 Celery 设置是:

However I never see the workers picking up on these tasks. Workers that I have currently set up are not listening to any specific queues (not initialized with queue names) and pick up tasks sent to the default queue just fine. My Celery settings are:

BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
        routing_key=CELERY_DEFAULT_ROUTING_KEY),
)

CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1

知道如何让工作人员接手发送到这个新创建队列的任务吗?

Any idea how I can get the workers to pick up on tasks sent to this newly created queue?

推荐答案

你需要告诉workers开始消费新的队列.相关文档在这里.

You need to tell the workers to start consuming the new queues. Relevant docs are here.

从命令行:

$ celery control add_consumer content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba

或者从python内部:

Or from within python:

>>> app.control.add_consumer('content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba', reply=True)

两种形式都接受目标参数,因此如果需要,您可以只告诉个别工作人员有关新队列的信息.

Both forms accept a destination argument, so you can tell individual workers only about the new queues if required.

这篇关于如何在运行时创建 celery 队列,以便发送到该队列的任务被工作人员拾取?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆