如何在FastAPI中进行多重处理 [英] How to do multiprocessing in FastAPI
问题描述
在处理FastAPI请求时,我要对列表的每个元素执行与CPU绑定的任务.我想在多个CPU内核上进行此处理.
While serving a FastAPI request, I have a CPU-bound task to do on every element of a list. I'd like to do this processing on multiple CPU cores.
在FastAPI中执行此操作的正确方法是什么?我可以使用标准的 multiprocessing
模块吗?到目前为止,我发现的所有教程/问题都只涉及与I/O绑定的任务,例如Web请求.
What's the proper way to do this within FastAPI? Can I use the standard multiprocessing
module? All the tutorials/questions I found so far only cover I/O-bound tasks like web requests.
推荐答案
异步定义
端点
您可以使用 loop.run_in_executor 与 ProcessPoolExecutor 一起在单独的进程中启动功能.
async def
endpoint
You could use loop.run_in_executor with ProcessPoolExecutor to start function at a separate process.
@app.post("/async-endpoint")
async def test_endpoint():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_func) # wait result
def
端点
由于 def
端点是运行在单独的线程中隐式,您可以使用模块多重处理的全部功能a>和 concurrent.futures .请注意,在 def
函数内部,可能不使用 await
.样本:
def
endpoint
Since def
endpoints are run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures. Note that inside def
function, await
may not be used. Samples:
@app.post("/def-endpoint")
def test_endpoint():
...
with multiprocessing.Pool(3) as p:
result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
...
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(f, [1, 2, 3])
注意:应记住,在终结点计算机上创建进程池以及创建大量线程可能会导致响应速度降低,因为请求增加.
在单独的过程中执行功能并立即等待结果的最简单,最本机的方法是使用 ProcessPoolExecutor .
The easiest and most native way to execute a function in a separate process and immediately wait for the results is to use the loop.run_in_executor with ProcessPoolExecutor.
可以在应用程序启动时创建一个池,如下例所示,并且不要忘记在应用程序退出时关闭.可以使用max_workers ProcessPoolExecutor
构造函数参数.如果 max_workers
是 None
或未指定,它将默认为计算机上的处理器数量.
A pool, as in the example below, can be created when the application starts and do not forget to shutdown on application exit. The number of processes used in the pool can be set using the max_workers ProcessPoolExecutor
constructor parameter. If max_workers
is None
or not given, it will default to the number of processors on the machine.
此方法的缺点是,请求处理程序(路径操作)在客户端连接保持打开状态的同时,等待计算在单独的进程中完成.而且,如果由于某种原因失去了连接,那么结果将无处可寻.
The disadvantage of this approach is that the request handler (path operation) waits for the computation to complete in a separate process, while the client connection remains open. And if for some reason the connection is lost, then the results will have nowhere to return.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI
from calc import cpu_bound_func
app = FastAPI()
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
@app.get("/{param}")
async def handler(param: int):
res = await run_in_process(cpu_bound_func, param)
return {"result": res}
@app.on_event("startup")
async def on_startup():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
移至背景
通常,CPU绑定的任务在后台执行.FastAPI可以运行后台任务,以便在之后运行>返回响应,您可以在其中启动并异步等待CPU绑定任务的结果.
Move to background
Usually, CPU bound tasks are executed in the background. FastAPI offers the ability to run background tasks to be run after returning a response, inside which you can start and asynchronously wait for the result of your CPU bound task.
例如,在这种情况下,您可以立即返回已接受"
(HTTP代码202)和唯一任务 ID
的响应,然后在后台,客户端以后可以使用此 ID
请求任务状态.
In this case, for example, you can immediately return a response of "Accepted"
(HTTP code 202) and a unique task ID
, continue calculations in the background, and the client can later request the status of the task using this ID
.
BackgroundTasks
提供了一些功能,特别是,您可以运行其中的几个功能(包括依赖项).并且在它们中,您可以使用从依赖项中获得的资源,只有当所有任务都完成时,这些资源才会被清理,而在例外情况下,则有可能正确处理它们.在图.
BackgroundTasks
provide some features, in particular, you can run several of them (including in dependencies). And in them you can use the resources obtained in the dependencies, which will be cleaned only when all tasks are completed, while in case of exceptions it will be possible to handle them correctly. This can be seen more clearly in this diagram.
下面是一个执行最小任务跟踪的示例.假定应用程序正在运行的一个实例.
Below is an example that performs minimal task tracking. One instance of the application running is assumed.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
from calc import cpu_bound_func
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {}
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
async def start_cpu_bound_task(uid: UUID, param: int) -> None:
jobs[uid].result = await run_in_process(cpu_bound_func, param)
jobs[uid].status = "complete"
@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
return new_task
@app.get("/status/{uid}")
async def status_handler(uid: UUID):
return jobs[uid]
@app.on_event("startup")
async def startup_event():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
更强大的解决方案
以上所有示例都很简单,但是如果您需要更强大的系统来进行大量分布式计算,则可以将消息代理 RabbitMQ
, Kafka
, NATS
等.以及使用它们的库,例如Celery.
More powerful solutions
All of the above examples were pretty simple, but if you need some more powerful system for heavy distributed computing, then you can look aside message brokers RabbitMQ
, Kafka
, NATS
and etc. And libraries using themthem like Celery.
这篇关于如何在FastAPI中进行多重处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!