FastAPI依赖项(Year):如何手动调用? [英] FastAPI dependencies (yield): how to call them manually?

查看:0
本文介绍了FastAPI依赖项(Year):如何手动调用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

FastAPI使用Depends()注入返回或产生的变量。例如,FastAPI/SQL

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
...
def create_user(db: Session = Depends(get_db)):
...
如果我想在其他地方(在FastAPI路由外)使用get_db(),我该如何操作?我知道这是Python的核心知识,但我似乎搞不懂。我最初的想法是db = yield from get_db(),但我不能在异步函数中调用yield from(并且不知道它是否还能工作)。然后我试了一下:

with get_db() as db:
   pass

失败,因为原始get_db()没有包装为@contextmanager。(请注意,我不想修饰这一点--我使用get_db作为示例,我需要处理更复杂的依赖项)。最后,我尝试了db = next(get_db())--这很管用,但我不认为这是正确的解决方案。何时/如何调用finally-当我的方法返回时?在其他一些依赖项中,有需要执行的交付后代码;我是否需要再次调用next()以确保代码执行?似乎next()不是正确的方式。有什么想法吗?

推荐答案

您不能将contextmanager用作装饰符,而可以用作返回上下文管理器的函数:

from contextlib import contextmanager

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# synchronously
with contextmanager(get_db)() as session:  # execute until yield. Session is yielded value
    pass
# execute finally on exit from with

但请记住,代码将同步执行。如果您想在线程中执行它,那么您可以使用FastAPI工具:

import asyncio
from contextlib import contextmanager

from fastapi.concurrency import contextmanager_in_threadpool


async def some_coro():
    async with contextmanager_in_threadpool(contextmanager(get_db)()) as session:
        pass

这篇关于FastAPI依赖项(Year):如何手动调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆