如何检查处理芹菜任务的队列 [英] How to Inspect the Queue Processing a Celery Task
问题描述
我目前正在利用芹菜来执行定期任务.我是芹菜新手.我有两个工作人员在两个不同的队列中运行.一种用于慢速后台作业,另一种用于用户在应用程序中排队的作业.
我正在监视datadog上的任务,因为这是确认我的工人正常运行的简便方法.
我要做的是在每个任务完成后,记录任务在哪个队列上完成.
@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
statsd.increment("celery.on_task_publish.start.increment")
task = celery.tasks.get(sender)
queue_name = task.queue
statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])
以下功能是我研究celery文档和一些StackOverflow帖子后实现的,但未按预期运行.我得到了第一个statsd增量,但是其余代码没有执行.
我想知道是否有一种更简单的方法来检查每个任务内部/之后,哪个队列处理了任务.
由于您的问题是是否有一种方法可以检查每个任务完成后的内部/内部?-我假设您没有尝试过这个芹菜结果后端的东西.因此,您可以检查一下Celery本身提供的此功能: Celery-Result-Backend / Task-result-Backend
.
这对于存储您的芹菜任务的结果非常有用.
阅读此=> https://docs .celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings
一旦您了解如何设置此结果后端,请搜索result_extended
键(在同一链接中),以便能够在任务返回值中添加queue-names
.
可用的选项数量-像您可以将这些结果设置为使用以下任何一项一样:
Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc
我已将此 Result-Backend
功能与 Elasticsearch
结合使用,这也是我的任务结果存储方式:
只需根据需要在settings.py
文件中添加少量配置即可.对于我的应用程序来说确实工作得很好.而且我每周都有一次cron,仅清除successful results
个任务-因为我们不再需要结果了-而且我只能看到failed results
(如图像中的那个). >
这些是我需要的主键:task_track_started
和task_acks_late
以及result_backend
I'm currently leveraging celery for periodic tasks. I am new to celery. I have two workers running two different queues. One for slow background jobs and one for jobs user's queue up in the application.
I am monitoring my tasks on datadog because it's an easy way to confirm my workers a running appropriately.
What I want to do is after each task completes, record which queue the task was completed on.
@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
statsd.increment("celery.on_task_publish.start.increment")
task = celery.tasks.get(sender)
queue_name = task.queue
statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])
The following function is something that I implemented after researching the celery docs and some StackOverflow posts, but it's not working as intended. I get the first statsd increment but the remaining code does not execute.
I am wondering if there is a simpler way to inspect inside/after each task completes, what queue processed the task.
Since your question says is there a way to inspect inside/after each task completes - I'm assuming you haven't tried this celery-result-backend stuff. So you could check out this feature which is provided by Celery itself : Celery-Result-Backend / Task-result-Backend
.
It is very useful for storing results of your celery tasks.
Read through this => https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings
Once you get an idea of how to setup this result-backend, Search for result_extended
key (in the same link) to be able to add queue-names
in your task return values.
Number of options are available - Like you can setup these results to go to any of these :
Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc
I have made use of this Result-Backend
feature with Elasticsearch
and this how my task results are stored :
It is just a matter of adding few configurations in settings.py
file as per your requirements. Worked really well for my application. And I have a weekly cron that clears only successful results
of tasks - since we don't need the results anymore - and I can see only failed results
(like the one in image).
These were main keys for my requirement : task_track_started
and task_acks_late
along with result_backend
这篇关于如何检查处理芹菜任务的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!