重试丢失或失败的任务(Celery、Django 和 RabbitMQ) [英] Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

查看:47
本文介绍了重试丢失或失败的任务(Celery、Django 和 RabbitMQ)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法确定是否有任何任务丢失并重试?

Is there a way to determine if any task is lost and retry it?

我认为丢失的原因可能是调度程序错误或工作线程崩溃.

I think that the reason for lost can be dispatcher bug or worker thread crash.

我打算重试它们,但我不确定如何确定哪些任务需要停用?

I was planning to retry them but I'm not sure how to determine which tasks need to be retired?

以及如何使这个过程自动进行?我可以使用我自己的自定义调度程序来创建新任务吗?

And how to make this process automatically? Can I use my own custom scheduler which will create new tasks?

我从文档中发现 RabbitMQ 从不丢失任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

I found from the documentation that RabbitMQ never loose tasks, but what happens when worker thread crash in the middle of task execution?

推荐答案

你需要的是设置

CELERY_ACKS_LATE = 真

CELERY_ACKS_LATE = True

Late ack 表示任务执行完毕后会确认任务消息,不只是之前,这是默认行为.这样如果worker崩溃了rabbit MQ还是会有消息的.

Late ack means that the task messages will be acknowledged after the task has been executed, not just before, which is the default behavior. In this way if the worker crashes rabbit MQ will still have the message.

显然,在完全崩溃(Rabbit + workers)的同时,没有办法恢复任务,除非您在任务开始和任务结束时实现日志记录.就我个人而言,我在 mongodb 中每次任务开始时写一行,任务完成时写另一行(独立形成结果),这样我可以通过分析 mongo 日志知道哪个任务被中断.

Obviously of a total crash (Rabbit + workers) at the same time there is no way of recovering the task, except if you implement a logging on task start and task end. Personally I write in a mongodb a line every time a task start and another one when the task finish (independently form the result), in this way I can know which task was interrupted by analyzing the mongo logs.

您可以通过重写 celery 基任务类的方法 __call__after_return 轻松实现.

You can do it easily by overriding the methods __call__ and after_return of the celery base task class.

接下来您会看到我的一段代码,它使用 taskLogger 类作为上下文管理器(带有入口和出口点).taskLogger 类只是在 mongodb 实例中写入包含任务信息的行.

Following you see a piece of my code that uses a taskLogger class as context manager (with entry and exit point). The taskLogger class simply writes a line containing the task info in a mongodb instance.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

希望对你有帮助

这篇关于重试丢失或失败的任务(Celery、Django 和 RabbitMQ)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆