执行芹菜任务后如何测试电子邮件是否已发送 [英] How to test if email was sent after executing celery task
问题描述
我正在使用Django 1.10和Celery 4.1
I'm using Django 1.10 and Celery 4.1
我有一个 shared_task
,它可以向用户发送电子邮件.
I have a shared_task
which sends an email to the user.
# myapp/tasks.py
@shared_task
def notify_user(user_id):
# TODO: send email and do other stuff here
user = get_object_or_404(User, pk=user_id)
send_mail(
'Subject',
'Body',
'from@example.com',
[user.email],
)
我还有另一个文件,其中包含一个函数,该函数将把这些任务放入队列.
I have another file which contains a function that calls puts that tasks into the queue.
# myapp/utils.py
# ...
def update_queue(self):
# increment no_of_used_referrals by 1
if no_of_used_referrals == 5:
notify_user.apply_async((self.user_id,))
else:
notify_user.apply_async((self.user_id,), eta=new_eta)
现在,我正在尝试测试调用 update_queue()
(所有必需的检查均通过)是否在用户执行时向用户发送电子邮件.
Now I am trying to test whether calling update_queue()
(where all required checks passes) sends an email to the user when its executed.
我尝试执行以下操作:
# myapp/tests.py
def update_queue_should_call_notify_user_immediately_after_five_referrals_were_used(self):
with unittest.mock.patch('myapp.tasks.notify_user.apply_async') as notify_user_mock:
# ...
for _ in range(5):
entry.update_queue()
self.assertTrue(notify_user_mock.called)
notify_user_mock.assert_called_with((user_id,))
# TODO: check if email was sent
# I tried using :
# self.assertEqual(len(mail.outbox), 1)
# but it fails with error saying 0 != 1
def test_notify_user_should_send_an_email(self):
notify_user.apply_async((user_id,))
# I tried using:
# self.assertEqual(len(mail.outbox), 1)
# but it fails with error saying 0 != 1
我在项目设置中设置了 EMAIL_BACKEND ='django.core.mail.backends.locmem.EmailBackend'
.
I have set EMAIL_BACKEND = 'django.core.mail.backends.locmem.EmailBackend'
in my project settings.
有人可以告诉我我做的事情有什么问题以及如何正确测试此案吗?
Can someone please tell me what is wrong with what I am doing and how to correctly test this case?
编辑我已经更新了排除模拟的代码-如@DanielRoseman所建议.
EDIT I have updated my code where I excluded mocking - as suggested by @DanielRoseman.
EDIT2 请在上方查看更新的文件.
EDIT2 Please see updated files above.
我正在模拟推荐系统.一旦使用了与特定用户相关的5个引荐链接,该用户的个人资料就会获得一些不错的功能.否则,他们必须等待特定的时间,这是我在 apply_async
上使用 eta
参数设置的.
I am simulating referral system. Once 5 referral links associated with a particular user have been used, user get's some nice feature to their profile. Otherwise they have to wait for a specific time, which I set using eta
argument on apply_async
.
每次我致电 update_queue
时,我都会检查推介次数是否等于5(请参见上面的更新代码).
Every time I call update_queue
I check if the number of referals is equal to 5(please see updated code above).
- 如果是-我想立即调用
notify_user
,而不传递eta
参数值. - 如果不是-我增加了使用的引荐链接的数量,请撤消旧的
notify_user
任务,并使用新的eta
参数创建新的notify_user
任务值.
- If it is - I want to call
notify_user
immediately, without passingeta
argument value. - If it is not - I increment number of used referral links, revoke old
notify_user
task, create newnotify_user
task with neweta
argument value.
为了测试我是否正在for循环中模拟该行为,并且我想测试在5次迭代(等于5个使用的引用链接)之后是否向用户发送了一封电子邮件(出于测试目的,我在-电子邮件的内存后端.)
In order to test that I am simulating that behaviour in for-loop, and I want to test whether after 5 iterations(equal to 5 used referral links) an email was sent to the user (for test purposes I use in-memory backend for email).
推荐答案
我把它放在这里的目的是要面对同样的问题.
I put it here for someone that will face the same problem.
我已经解决了
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
https://stackoverflow.com/a/46531472/7396169
我认为该解决方案适合单元测试.
I think this solution is suitable for unit testing.
这篇关于执行芹菜任务后如何测试电子邮件是否已发送的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!