如何检查处理芹菜任务的队列 [英] How to Inspect the Queue Processing a Celery Task

查看:171
本文介绍了如何检查处理芹菜任务的队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在利用芹菜来执行定期任务.我是芹菜新手.我有两个工作人员在两个不同的队列中运行.一种用于慢速后台作业,另一种用于用户在应用程序中排队的作业.

我正在监视datadog上的任务,因为这是确认我的工人正常运行的简便方法.

我要做的是在每个任务完成后,记录任务在哪个队列上完成.

@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
    statsd.increment("celery.on_task_publish.start.increment")

    task = celery.tasks.get(sender)
    queue_name = task.queue

    statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])

以下功能是我研究celery文档和一些StackOverflow帖子后实现的,但未按预期运行.我得到了第一个statsd增量,但是其余代码没有执行.

我想知道是否有一种更简单的方法来检查每个任务内部/之后,哪个队列处理了任务.

解决方案

由于您的问题是是否有一种方法可以检查每个任务完成后的内部/内部?-我假设您没有尝试过这个芹菜结果后端的东西.因此,您可以检查一下Celery本身提供的此功能: Celery-Result-Backend / Task-result-Backend . 这对于存储您的芹菜任务的结果非常有用. 阅读此=> https://docs .celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings


一旦您了解如何设置此结果后端,请搜索result_extended键(在同一链接中),以便能够在任务返回值中添加queue-names.

可用的选项数量-像您可以将这些结果设置为使用以下任何一项一样:

Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc 


我已将此 Result-Backend 功能与 Elasticsearch 结合使用,这也是我的任务结果存储方式:

只需根据需要在settings.py文件中添加少量配置即可.对于我的应用程序来说确实工作得很好.而且我每周都有一次cron,仅清除successful results个任务-因为我们不再需要结果了-而且我只能看到failed results (如图像中的那个).

这些是我需要的主键:task_track_startedtask_acks_late以及result_backend

I'm currently leveraging celery for periodic tasks. I am new to celery. I have two workers running two different queues. One for slow background jobs and one for jobs user's queue up in the application.

I am monitoring my tasks on datadog because it's an easy way to confirm my workers a running appropriately.

What I want to do is after each task completes, record which queue the task was completed on.

@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
    statsd.increment("celery.on_task_publish.start.increment")

    task = celery.tasks.get(sender)
    queue_name = task.queue

    statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])

The following function is something that I implemented after researching the celery docs and some StackOverflow posts, but it's not working as intended. I get the first statsd increment but the remaining code does not execute.

I am wondering if there is a simpler way to inspect inside/after each task completes, what queue processed the task.

解决方案

Since your question says is there a way to inspect inside/after each task completes - I'm assuming you haven't tried this celery-result-backend stuff. So you could check out this feature which is provided by Celery itself : Celery-Result-Backend / Task-result-Backend . It is very useful for storing results of your celery tasks. Read through this => https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings


Once you get an idea of how to setup this result-backend, Search for result_extended key (in the same link) to be able to add queue-names in your task return values.

Number of options are available - Like you can setup these results to go to any of these :

Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc 


I have made use of this Result-Backend feature with Elasticsearch and this how my task results are stored :

It is just a matter of adding few configurations in settings.py file as per your requirements. Worked really well for my application. And I have a weekly cron that clears only successful results of tasks - since we don't need the results anymore - and I can see only failed results (like the one in image).

These were main keys for my requirement : task_track_started and task_acks_late along with result_backend

这篇关于如何检查处理芹菜任务的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆