Python-从另一个队列重试失败的Celery任务 [英] Python - Retry a failed Celery task from another queue

查看:163
本文介绍了Python-从另一个队列重试失败的Celery任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将数据发布到Celery的网络服务中。有时,由于Internet断开,数据未发布到Web服务,并且任务无限次重试直到发布。重试该任务是不必要的,因为网已关闭,因此不需要再次重试。

I'm posting a data to a web-service in Celery. Sometimes, the data is not posted to web-service because of the internet is down, and the task is retried infinite times until it is posted. The retrying of the task is un-necessary because the net was down and hence its not required to re-try it again.

我想到了一个更好的解决方案,即一个任务三次失败(重试至少3次),然后将其转移到另一个队列。此队列包含所有失败任务的列表。
现在,当Internet接通并且数据通过网络发布时,即任务已从正常队列中完成,然后从具有失败任务的队列中开始处理任务。
这不会浪费CPU内存一次又一次地重试任务。

I thought of a better solution, ie if a task fails thrice (retrying a min of 3 times), then it is shifted to another queue. This queue contains list of all failed tasks. Now when the internet is up and the data is posted over the net , ie the task has been completed from the normal queue, it then starts processing the tasks from the queue having failed tasks. This will not waste the CPU memory of retrying the task again and again.

这是我的代码:-截至目前,我只是在重试再次执行任务,但我怀疑这样做是否正确。

Here's my code :- As of right now, I'm just retrying the task again, But I doubt whether that'll be the right way of doing it.

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):

    try : 
        client = SoapClient(
                            location = url,
                            action = 'http://tempuri.org/IService_1_0/',
                            namespace = "http://tempuri.org/", 
                            soap_ns='soap', ns = False
                            )

        response= client.UpdateShipment(
                                        Weight = Decimal(data['Weight']), 
                                        Length = Decimal(data['Length']), 
                                        Height = Decimal(data['Height']), 
                                        Width =  Decimal(data['Width']) , 
                                        )

    except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

如何同时维护两个队列并尝试从两个队列中执行任务。

How do I maintain 2 queues simultaneous and trying to execute tasks from both the queues.

Settings.py

BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'


推荐答案

默认情况下,celery将所有任务添加到名为 celery 的队列中。因此,您可以在此处运行任务,当发生异常时,它会重试,一旦达到最大重试次数,您就可以将它们转移到新队列,例如 foo

By default celery adds all tasks to queue named celery. So you can run your task here and when an exception occurs, it retries, once it reaches maximum retries, you can shift them to a new queue say foo

from celery.exceptions import MaxRetriesExceededError

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
    try:
        #do something with given args

 except MaxRetriesExceededError:
        post_data_to_web_service([data, url], queue='foo')

 except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

启动工作程序时,此任务将尝试使用给定的数据执行某些操作。如果失败,它将以60秒的交易重试10次。然后,当遇到 MaxRetriesExceededError 时,它将同一任务发布到新队列 foo

When you start your worker, this task will try to do something with given data. If it fails it will retry 10 times with a dealy of 60 seconds. Then when it encounters MaxRetriesExceededError it posts the same task to new queue foo.

要消耗这些任务,您必须开始新工作

To consume these tasks you have to start a new worker

celery worker -l info -A my_app -Q foo

或者如果您使用

or you can also consume this task from the default worker if you start it with

 celery worker -l info -A my_app -Q celery,foo

这篇关于Python-从另一个队列重试失败的Celery任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆