如何在Celery-Django应用程序中监视来自工作人员的事件? [英] How to monitor events from workers in a Celery-Django application?

查看:123
本文介绍了如何在Celery-Django应用程序中监视来自工作人员的事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据有关实时监控芹菜工人的芹菜教程 ,还可以以编程方式捕获工人产生的事件并采取相应的行动.

According to the celery tutorial regarding real-time monitoring of celery workers, one can also programmatically capture the events produced by the workers and take action accordingly.

我的问题是如何在例如,在Celery-Django应用程序中?

My question is how can I integrate a monitor as the one in this example, in a Celery-Django application?

本教程中的代码示例如下:

The code example in the tutorial looks like:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task_id = event['uuid']

        print('TASK FAILED: %s[%s] %s' % (
            event['name'], task_id, state[task_id].info(), ))
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'worker-heartbeat': announce_dead_workers,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    celery = Celery(broker='amqp://guest@localhost//')
    my_monitor(celery)

因此,我想捕获工作程序发送的task_failed事件,并按照教程所示获取其task_id,以便从为我的应用程序配置的结果后端中获取此任务的结果,并对其进行进一步处理.我的问题是,如何获得应用程序对我而言并不明显,因为在django-celery项目中,对Celery库的实例化对我而言并不透明.

So I want to capture task_failed event sent by the worker, and to get its task_id like the tutorial shows, to get the result for this task from the result-backend that was configured for my application and process it further. My problem is that it is not obvious to me how to get the application, as in a django-celery project it is not transparent to me the instantiation of Celery library.

对于工人完成任务后如何处理结果,我也持开放态度.

I am also open to any other idea as to how to process the results when a worker has finished executing a task.

推荐答案

好吧,我发现了一种解决方法,尽管我不确定这是否是解决方案,但对我有用.监视功能基本上直接连接到代理,并侦听不同类型的事件.我的代码如下:

Ok, I found a way of doing this, though I am not sure that this is the solution, but it works for me. The monitor function basically connects directly to the broker and listens to different types of events. My code looks like this:

from celery.events import EventReceiver
from kombu import Connection as BrokerConnection

def my_monitor:
    connection = BrokerConnection('amqp://guest:guest@localhost:5672//')

    def on_event(event):
        print "EVENT HAPPENED: ", event

    def on_task_failed(event):
        exception = event['exception']
        print "TASK FAILED!", event, " EXCEPTION: ", exception

    while True:
        try:
            with connection as conn:
                recv = EventReceiver(conn,
                                 handlers={'task-failed' : on_task_failed,
                                           'task-succeeded' : on_event,
                                           'task-sent' : on_event,
                                           'task-received' : on_event,
                                           'task-revoked' : on_event,
                                           'task-started' : on_event,
                                           # OR: '*' : on_event
                                           })
            recv.capture(limit=None, timeout=None)
    except (KeyboardInterrupt, SystemExit):
        print "EXCEPTION KEYBOARD INTERRUPT"
        sys.exit()

这就是全部.而且我在与正常应用程序不同的过程中运行此程序,这意味着我创建了仅运行此功能的celery应用程序的子进程. HTH

This is all. And I run this in a different process than the normal application, meaning that I create a child process of my celery application which only runs this function. HTH

这篇关于如何在Celery-Django应用程序中监视来自工作人员的事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆