Asyncio.create_subprocess_exec NotImplementedError-Fastapi后台任务 [英] Asyncio.create_subprocess_exec NotImplementedError - Fastapi Background Task

查看:0
本文介绍了Asyncio.create_subprocess_exec NotImplementedError-Fastapi后台任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试在Fastapi后台任务中调用asyncio.create_subprocess_exec,但它不断引发NotImplementedError。当run_subprocess函数在Fastapi之外运行时,它工作得很好。我在Windows中使用的是asyncio循环,而不是uvloop。

import asyncio
from fastapi import FastAPI, BackgroundTasks

DHCP_SERVER = "1.1.1.1"

app = FastAPI()

@app.get("/")
async def subprocess_test(background_tasks: BackgroundTasks):
  background_tasks.add_task(run_subprocess)

async def run_subprocess():
  proc = await asyncio.create_subprocess_exec(
    'powershell.exe',
    f'Get-Dhcp-Serverv4Scope -ComputerName "{DHCP_SERVER}"',
    stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
  )

  stdout, stderr = await proc.communicate()

  if stderr:
    print(stderr)
  else:
    print(stdout)


File ".subprocess_example.py", line 13 in run_subprocess
  proc = await asyncio.create_subprocess_exec(
File "C:PythonPython38-32libasynciosubprocess.py", line 236, in create_subprcess_exec
  transport, protocol = await loop.subprocess_exec(
File "C:PythonPython38-32libasyncioase_events.py", line 1615, in subprocess_exec
  transport = await self._make_subprocess_transport(
File "C:PythonPython38-32libasyncioase_events.py", line 487, in _make_subprocess_transport
  raise NotImplementedError

有人能帮我解决这个问题吗?

谢谢!

推荐答案

我相信这个错误是因为Fastapi使用uvloop,而asyncio在没有设置策略的情况下不知道这一点,有一个答案提供了如何实现这一点的钩子;

asyncio event loop equivalent with uvloop

这篇关于Asyncio.create_subprocess_exec NotImplementedError-Fastapi后台任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆