如何在运行时创建芹菜队列,以便发送到该队列的任务被工作人员拿走? [英] How do I create celery queues on runtime so that tasks sent to that queue gets picked up by workers?

查看:260
本文介绍了如何在运行时创建芹菜队列,以便发送到该队列的任务被工作人员拿走?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用django 1.4,celery 3.0,rabbitmq

I'm using django 1.4, celery 3.0, rabbitmq

为了描述问题,我在系统中有很多内容网络,我想要一个处理任务的队列与这些网络中的每一个相关。

To describe the problem, I have many content networks in a system and I want a queue for processing tasks related to each of these network.

然而内容是在系统生效时即时创建的,因此我需要即时创建队列,并且现有的工作人员开始采集

However content is created on the fly when the system is live and therefore I need to create queues on the fly and have existing workers start picking up on them.

我以下列方式尝试了调度任务(内容是django模型实例):

I've tried scheduling tasks in the following way (where content is a django model instance):

queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)

创建名称为 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 的队列,创建名称为<$ c的新交换$ c> content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 和路由密钥 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 并发送一个任务到这个队列。

This create a queue with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba, creates a new exchange with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and routing key content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and sends a task to that queue.

然而,我从来没有看到工作人员接受这些任务。我目前设置的工作人员不在听到任何特定的队列(没有使用队列名称初始化),并且接收发送到默认队列的任务就好了。我的芹菜设置是:

However I never see the workers picking up on these tasks. Workers that I have currently set up are not listening to any specific queues (not initialized with queue names) and pick up tasks sent to the default queue just fine. My Celery settings are:

BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
        routing_key=CELERY_DEFAULT_ROUTING_KEY),
)

CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1

任何想法如何让工作人员接收发送到这个新创建的队列的任务?

Any idea how I can get the workers to pick up on tasks sent to this newly created queue?

推荐答案

你需要告诉工作人员开始消费新的队列。相关文件是此处

You need to tell the workers to start consuming the new queues. Relevant docs are here.

从命令行:

$ celery control add_consumer content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba

或从python内:

>>> app.control.add_consumer('content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba', reply=True)

两个表单都接受一个目的地参数,所以您可以根据需要向个别工作人员通知新队列。

Both forms accept a destination argument, so you can tell individual workers only about the new queues if required.

这篇关于如何在运行时创建芹菜队列,以便发送到该队列的任务被工作人员拿走?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆