如何对运行celery任务的代码进行单元测试? [英] How to unit test code that runs celery tasks?

查看:85
本文介绍了如何对运行celery任务的代码进行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理的应用程序是高度异步的.Web应用程序根据用户操作通过芹菜运行许多任务.芹菜任务本身可以启动更多任务.

The app I am working on is heavily asynchronous. The web application runs a lot of tasks through celery depending on user actions. The celery tasks themselves are capable of launching further tasks.

如下所示的代码在我们的代码库中非常频繁地出现.

Code such as the one shown below occurs in our code base quite frequently.

def do_sth():
    logic();
    if condition:
         function1.apply_async(*args)
    else:
         function2.apply_asynch(*args)

现在,我们要开始对所编写的任何新代码进行单元测试,并且不确定如何执行此操作.我们想在 pytest 单元测试中断言的是,我们要查看function1是否真正被调用.我们并不一定要自己运行 function1 ,因为我们将对 function1 进行单元测试.

Now we want to start unit testing any new code that we write and we are not sure how to do this. What we would like to assert in our pytest unit tests is that we want to see if function1 actually got called. We do not want to necessarily run function1 itself as we will have a unit test for the function1.

我不想在过程中运行celery,也不想在单元测试期间运行任何AMQP代理.

I do not want to be running celery as a process, neither do I want to run any AMQP broker during the unit test session.

这可以实现吗?

修改

有人指出,这与您如何对Celery任务进行单元测试?

不是.想一想.我要问的是如何通过apply_async测试函数是否已调用 function1 .这个问题是关于我如何测试 function1 本身.有很大的不同.在确定这个问题之前,我确实遇到了这个问题.

It is not. Think about it. What I am asking is how to test if function has called function1 through apply_async. That question is about how do I test function1 itself. There is a major difference. I did hit that question before framing this one.

推荐答案

看看

Have a look at the unittest.mock module which allows replacing functions you don't want to be invoked with "mocks" which look sufficiently similar to the original functions to convince the calling code that it's invoking the "real thing". You can then check that the mock was actually invoked and with what parameters. Example:

from unittest.mock import patch

def test_do_sth():
    with patch('function1.apply_async') as function1_mock:
        do_sth()
        assert function1_mock.called

这篇关于如何对运行celery任务的代码进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆