如何在测试FastAPI应用时触发寿命启动和关机? [英] How to trigger lifespan startup and shutdown while testing FastAPI app?

查看:0
本文介绍了如何在测试FastAPI应用时触发寿命启动和关机?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为FastAPI的新手,我正在努力测试比我在教程中看到的稍微更难的代码。我使用fastapi_cache模块和Redis如下:

from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches

app = FastAPI()

def redis_cache():
    return caches.get(CACHE_KEY)    

@app.get('/cache')
async def test(
    cache: RedisCacheBackend = Depends(redis_cache),
    n: int = Query(
        ..., 
        gt=-1
    )
):  
    # code that uses redis cache

@app.on_event('startup')
async def on_startup() -> None:
    rc = RedisCacheBackend('redis://redis')
    caches.set(CACHE_KEY, rc)

@app.on_event('shutdown')
async def on_shutdown() -> None:
    await close_caches()

test_main.py如下所示:

import pytest
from httpx import AsyncClient
from .main import app

@pytest.mark.asyncio
async def test_cache():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/cache?n=150")

当我运行pytest时,它将cache变量设置为None,测试失败。我想我明白为什么代码不起作用了。但我如何修复它才能正确测试我的缓存?

推荐答案

问题是httpx没有实现生命周期协议并触发startup事件处理程序。为此,您需要使用LifespanManager

安装:pip install asgi_lifespan

代码如下:

import pytest
from asgi_lifespan import LifespanManager
from httpx import AsyncClient
from .main import app


@pytest.mark.asyncio
async def test_cache():
    async with LifespanManager(app):
        async with AsyncClient(app=app, base_url="http://localhost") as ac:
            response = await ac.get("/cache")

更多信息请点击此处:https://github.com/encode/httpx/issues/350

这篇关于如何在测试FastAPI应用时触发寿命启动和关机?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆