芹菜单元测试重试 [英] Celery unit test retrying

查看:25
本文介绍了芹菜单元测试重试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前正在为我的芹菜任务编写单元测试,希望测试我的任务是否正在重试。

注意:在测试设置中,ALWAYS_EAGER设置为True

@app.shared_task(bind=True, soft_time_limit=600, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3},
                 retry_backoff=3)
def my_task(self, obj_pk):
    try:
        # ...
        function_call_that_can_raise_exception()
    except Exception as exc:
        last_try = self.request.retries >= self.retry_kwargs["max_retries"]

        if last_try:
            # ....
        else:
            # ...
        
        raise_with_context(exc)

我可以通过模拟celery.app.task.Task.request;

来测试我的上一次运行
@mock.patch("celery.app.task.Task.request")
def test_my_task(self):
    mock_task_request.retries = 3
    my_task(12)
    # some asserts

如何测试任务是否已自动重试?

推荐答案

诀窍是使用Apply而不是Delay或Apply_Async:

def test_my_task_is_retried(self):
    my_task.apply(kwargs={"obj_pk": 12})
    # assert what should only happen in the last run

这篇关于芹菜单元测试重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆