使Django测试用例数据库对Celery可见 [英] Make Django test case database visible to Celery

查看:73
本文介绍了使Django测试用例数据库对Celery可见的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当Django测试用例运行时,它将创建一个隔离的测试数据库,以便在每个测试完成时回滚数据库写操作。我正在尝试使用Celery创建一个集成测试,但是我不知道如何将Celery连接到该临时测试数据库。在朴素的设置中,保存在Django中的对象对于Celery是不可见的,保存在Celery中的对象将无限期持久。

When a Django test case runs, it creates an isolated test database so that database writes get rolled back when each test completes. I am trying to create an integration test with Celery, but I can't figure out how to connect Celery to this ephemeral test database. In the naive setup, Objects saved in Django are invisible to Celery and objects saved in Celery persist indefinitely.

这里是一个示例测试用例:

Here is an example test case:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

我有两个问题:


  1. 如何做到这一点,以便Celery可以看到Django测试用例的对象?

  1. How do make it so that Celery can see the objects that the Django test case?

如何确保测试完成后自动将Celery保存的所有对象回滚?

How do I ensure that all objects saved by Celery are automatically rolled back once the test completes?

如果无法自动执行操作,我愿意手动清理对象,但是要删除 tearDown 即使在 APISimpleTestCase 中也似乎已回滚。

I am willing to manually clean up the objects if doing this automatically is not possible, but a deletion of objects in tearDown even in APISimpleTestCase seems to be rolled back.

推荐答案

这是

Django的内存数据库为sqlite3 。就像在 Sqlite内存数据库的描述页面中所说的那样, [[A] ll个数据库连接共享内存数据库必须处于同一过程中。这意味着,只要Django使用内存中的测试数据库并且Celery是在单独的进程中启动的,从根本上来说,让Celery和Django共享测试数据库是不可能的。

Django's in-memory database is sqlite3. As it says on the description page for Sqlite in-memory databases, "[A]ll database connections sharing the in-memory database need to be in the same process." This means that, as long as Django uses an in-memory test database and Celery is started in a separate process, it is fundamentally impossible to have Celery and Django to share a test database.

但是,使用 celery.contrib.testing.worker.start_worker ,可以在同一进程中的单独线程中启动Celery worker。该工作人员可以访问内存数据库。

However, with celery.contrib.testing.worker.start_worker, it possible to start a Celery worker in a separate thread within the same process. This worker can access the in-memory database.

这假定Celery已在使用Django项目的常用方法

This assumes that Celery is already setup in the usual way with the Django project.

因为Django-Celery涉及到一些跨线程通信,所以只有不在独立事务中运行的测试用例才能起作用。测试用例必须直接从 SimpleTestCase 或其其余等效项 APISimpleTestCase 继承,并设置类属性 allow_database_queries True

Because Django-Celery involves some cross-thread communication, only test cases that don't run in isolated transactions will work. The test case must inherit directly from SimpleTestCase or its Rest equivalent APISimpleTestCase and set the class attribute allow_database_queries to True.

关键是要在< TestCase 的code> setUpClass 方法,并在 tearDownClass 方法中将其关闭。关键功能是 celery.contrib.testing.worker.start_worker(app),它需要当前Celery应用程序的一个实例,大概是从 mysite获得的.celery.app 并返回一个Python ContextManager ,它具有 __ enter __ __ exit __ 方法,分别必须在 setUpClass tearDownClass 中调用。可能有一种方法可以避免手动输入 ContextManager 并将其与装饰器或其他东西一起存在,但是我无法弄清楚。以下是 tests.py 文件的示例:

The key is to start a Celery worker in the setUpClass method of the TestCase and close it in the tearDownClass method. The key function is celery.contrib.testing.worker.start_worker(app), which requires an instance of the current Celery app, presumably obtained from mysite.celery.app and returns a Python ContextManager, which has __enter__ and __exit__ methods, which must be called in setUpClass and tearDownClass, respectively. There is probably a way to avoid manually entering and existing the ContextManager with a decorator or something, but I couldn't figure it out. Here is an example tests.py file:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    allow_database_queries = True

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something

无论出于何种原因,测试人员都尝试使用名为的任务'celery.ping',可能会在这种情况下提供更好的错误消息工人失败的原因。甚至将 perform_ping_check 设置为 False 作为关键字参数ot start_worker 仍在测试它的存在。它正在寻找的任务是 celery.contrib.testing.tasks.ping 。但是,默认情况下未安装此任务。应该可以通过在设置中将 celery.contrib.testing 添加到 INSTALLED_APPS 来提供此任务。 .py 。但是,这仅使工人可见。而不是生成工作程序的代码。生成工作线程的代码会在app.tasks 中 app.autodiscover_tasks()可以拾取的位置,例如 celery.py

For whatever reason, the testing worker tries to use a task called 'celery.ping', probably to provide better error messages in the case of worker failure. Even setting perform_ping_check to False as a keyword argument ot start_worker still tests for its existence. The task it is looking for is celery.contrib.testing.tasks.ping. However, this task is not installed by default. It should possible to provide this task by adding celery.contrib.testing to INSTALLED_APPS in settings.py. However, this only makes it visible to the worker; not the code that generates the worker. The code that generates the worker does an assert 'celery.ping' in app.tasks, which fails. Commenting this out makes everything work, but modifying an installed library is not a good solution. I am probably doing something wrong, but the workaround I settled on is to copy the simple function somewhere it can be picked up by app.autodiscover_tasks(), such as celery.py:

@app.task(name='celery.ping')
def ping():
    # type: () -> str
    """Simple task that just returns 'pong'."""
    return 'pong'

现在,在运行测试时,无需启动单独的Celery进程。 Celery worker将作为单独的线程在Django测试过程中启动。该工作人员可以查看任何内存数据库,包括默认的内存测试数据库。要控制工作人员的数量, start_worker 中提供了一些选项,但默认情况下是单个工作人员。

Now, when the tests are run, there is no need to start a separate Celery process. A Celery worker will be started in the Django test process as a separate thread. This worker can see any in-memory databases, including the default in-memory test database. To control the number of workers, there are options available in start_worker, but it appears the default is a single worker.

这篇关于使Django测试用例数据库对Celery可见的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆