单元测试芹菜中的AsyncResult [英] Unit testing an AsyncResult in celery

查看:182
本文介绍了单元测试芹菜中的AsyncResult的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在Django的单元测试框架中测试一些芹菜功能,但是每当我尝试检查AsyncResult时,测试就像从未开始。



我知道这个代码在一个真正的RabbitMQ环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。
$ b

这是一个例子:

  @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_ALWAYS_EAGER = true,
BROKER_BACKEND ='memory',)
def test_celery_do_work(self):
result = myapp.tasks.celery_do_work.AsyncResult('blat')
applied_task = myapp。 tasks.celery_do_work.apply_async((),task_id ='blat')
applied_task.wait()
#这个SUCCEEDS
self.assertTrue(applied_task.successful())
#这个失败
self.assertTrue(result.successful())

使用ALWAYS_EAGER选项禁用AsyncResult功能,因为它立即执行?如果是这样,有没有办法单元测试AsyncResult状态检查?如果我尝试取出ALWAYS_EAGER选项,测试永远不会运行,所以我很失落。



谢谢!

解决方案

CELERY_ALWAYS_EAGER True 时,调用 apply_async()实际上被替换为 apply()。返回的结果是一个 EagerResult ,它已经包含了您的任务结果。



所以,是的,设置 ALWAYS_EAGER = True 禁用整个 AsyncResult 功能。整个异步进程被忽略,并且没有任务实际上被发送到代理,这就是为什么你不能通过 AsyncResult 检索结果。



使用 CELERY_ALWAYS_EAGER = True 当您测试只需要Celery结果的代码路径时,使用 EagerResult AsyncResult



如果需要,有一种方法来运行测试使用 AsyncResult ,使用 CELERY_ALWAYS_EAGER = False ,但是为此,您需要在调用测试用例中的任务然后工作人员可以执行你的任务,而 AsyncResult 将会正常工作。您可以看看 django-celery-testworker ,尽管我还没有测试它。


I am trying to test some celery functionality in Django's unit testing framework, but whenever I try to check an AsyncResult the tests act like it was never started.

I know this code works in a real environment with RabbitMQ, so I was just wondering why it didn't work when using the testing framework.

Here is an example:

@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
                   CELERY_ALWAYS_EAGER = True,
                   BROKER_BACKEND = 'memory',)
def test_celery_do_work(self):
    result = myapp.tasks.celery_do_work.AsyncResult('blat')
    applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat')
    applied_task.wait()
    # THIS SUCCEEDS
    self.assertTrue(applied_task.successful())
    # THIS FAILS
    self.assertTrue(result.successful())

Does using the ALWAYS_EAGER option disable the AsyncResult functionality since it's executing immediately? If so, is there any way to able to unit test AsyncResult status checking? If I try to take out the ALWAYS_EAGER option the tests never run, so I am at a loss.

Thanks!

解决方案

When CELERY_ALWAYS_EAGER is True, the call to apply_async() is actually replaced with apply(). The result returned is an EagerResult, which already contains the result of your task.

So, yes, setting ALWAYS_EAGER = True disables the entire AsyncResult functionnality. The entire async process is bypassed, and no task is actually sent to the broker, which is why you cannot retrieve the result through an AsyncResult.

Use CELERY_ALWAYS_EAGER = True when you are testing code path which just need a Celery result, and work the same way with an EagerResult or an AsyncResult.

If needed, there is a way to run tests with AsyncResult too, with CELERY_ALWAYS_EAGER = False, but for this, you'll need to start a worker before calling the task in your test case. The worker will then be able to execute your task, and AsyncResult will work just fine. You can take a look at django-celery-testworker which seems to do just that, although I've not tested it.

这篇关于单元测试芹菜中的AsyncResult的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆