使用动态routing_key启动工作程序? [英] Starting worker with dynamic routing_key?
问题描述
我有一个队列,其中包含几种任务类型,并且我需要为特定任务运行worker.类似于:芹菜工人--routing_key task.type1 --app = app"
I have one queue with several task types and I need to run worker for specific task. Something like: 'celery worker --routing_key task.type1 --app=app'
队列配置:
CELERY_QUEUES = (
Queue('myqueue', routing_key='task.#'),
)
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
使用pika任务很容易解决: http://www.rabbitmq.com/tutorials/tutorial-five-python.html ,但是怎么用芹菜做呢?
Using pika task is easy to solve: http://www.rabbitmq.com/tutorials/tutorial-five-python.html but how to do it with celery?
推荐答案
Np,您不能将辅助线程绑定到routing_key.
Np, you can't bind a worker to a routing_key.
- 工作人员使用队列而不是routing_key.
- 生产者发送带有routing_key的消息,该Rabbitmq路由到队列.
皮卡也不可能.
在本教程中,工作人员/消费者将其自己的队列绑定到路由密钥.
In the tutorial the worker/consumer binds its own queue to the routing key.
- 生产者发出带有路由键='info'的日志
- RabbitMQ将丢弃所有它们,直到将队列绑定到该routing_key
- 接收方创建一个队列A并将其绑定到routing_key'info'
- 现在,rabbitmq将带有routing_key'info'的日志路由到队列A,接收者将它们消耗掉
您可以使用芹菜轻松重现此绑定.
You can reproduce this binding easily with celery.
例如,您可以在celery配置文件中完成该操作:
In exemple you can do it in the celery configuration file:
exchange = Exchange('default', type=topic)
CELERY_QUEUES = (
Queue('all_logs', exchange, routing_key='logs.#'),
Queue('info_logs', exchange, routing_key='logs.info')
)
接收所有日志:
$ celery worker -A receive_logs -Q all_logs
仅接收信息"日志(仅具有routing_key = logs.info的消息)
receive only 'info' logs (msg with routing_key=logs.info only)
$ celery worker -A receive_logs -Q info_logs
最后,您启动了一个仅消耗msg且具有特定routing_key的工作程序,这就是您想要的.
In the end you have started a worker that consume only msg with a specific routing_key, which is what you want.
注意:Queue:all_logs和Queue:info_logs中的信息日志都是重复的
note: info logs are duplicated in both Queue:all_logs and Queue:info_logs
您可能对以下内容感兴趣: http://docs.celeryproject.org/en/latest/configuration.html?highlight=direct#std:setting-CELERY_WORKER_DIRECT
You might be interested by: http://docs.celeryproject.org/en/latest/configuration.html?highlight=direct#std:setting-CELERY_WORKER_DIRECT
这篇关于使用动态routing_key启动工作程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!