如何在FastAPI中进行多重处理 [英] How to do multiprocessing in FastAPI

查看:256
本文介绍了如何在FastAPI中进行多重处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在处理FastAPI请求时,我要对列表的每个元素执行与CPU绑定的任务.我想在多个CPU内核上进行此处理.

While serving a FastAPI request, I have a CPU-bound task to do on every element of a list. I'd like to do this processing on multiple CPU cores.

在FastAPI中执行此操作的正确方法是什么?我可以使用标准的 multiprocessing 模块吗?到目前为止,我发现的所有教程/问题都只涉及与I/O绑定的任务,例如Web请求.

What's the proper way to do this within FastAPI? Can I use the standard multiprocessing module? All the tutorials/questions I found so far only cover I/O-bound tasks like web requests.

推荐答案

异步定义端点

您可以使用 loop.run_in_executor ProcessPoolExecutor 一起在单独的进程中启动功能.

async def endpoint

You could use loop.run_in_executor with ProcessPoolExecutor to start function at a separate process.

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def 端点

由于 def 端点是运行在单独的线程中隐式,您可以使用模块多重处理的全部功能a>和 concurrent.futures .请注意,在 def 函数内部,可能不使用 await .样本:

def endpoint

Since def endpoints are run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures. Note that inside def function, await may not be used. Samples:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])

@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

注意:应记住,在终结点计算机上创建进程池以及创建大量线程可能会导致响应速度降低,因为请求增加.

在单独的过程中执行功能并立即等待结果的最简单,最本机的方法是使用 ProcessPoolExecutor .

The easiest and most native way to execute a function in a separate process and immediately wait for the results is to use the loop.run_in_executor with ProcessPoolExecutor.

可以在应用程序启动时创建一个池,如下例所示,并且不要忘记在应用程序退出时关闭.可以使用max_workers ProcessPoolExecutor 构造函数参数.如果 max_workers None 或未指定,它将默认为计算机上的处理器数量.

A pool, as in the example below, can be created when the application starts and do not forget to shutdown on application exit. The number of processes used in the pool can be set using the max_workers ProcessPoolExecutor constructor parameter. If max_workers is None or not given, it will default to the number of processors on the machine.

此方法的缺点是,请求处理程序(路径操作)在客户端连接保持打开状态的同时,等待计算在单独的进程中完成.而且,如果由于某种原因失去了连接,那么结果将无处可寻.

The disadvantage of this approach is that the request handler (path operation) waits for the computation to complete in a separate process, while the client connection remains open. And if for some reason the connection is lost, then the results will have nowhere to return.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

移至背景

通常,CPU绑定的任务在后台执行.FastAPI可以运行后台任务,以便在之后运行>返回响应,您可以在其中启动并异步等待CPU绑定任务的结果.

Move to background

Usually, CPU bound tasks are executed in the background. FastAPI offers the ability to run background tasks to be run after returning a response, inside which you can start and asynchronously wait for the result of your CPU bound task.

例如,在这种情况下,您可以立即返回已接受" (HTTP代码202)和唯一任务 ID 的响应,然后在后台,客户端以后可以使用此 ID 请求任务状态.

In this case, for example, you can immediately return a response of "Accepted" (HTTP code 202) and a unique task ID, continue calculations in the background, and the client can later request the status of the task using this ID.

BackgroundTasks 提供了一些功能,特别是,您可以运行其中的几个功能(包括依赖项).并且在它们中,您可以使用从依赖项中获得的资源,只有当所有任务都完成时,这些资源才会被清理,而在例外情况下,则有可能正确处理它们.在.

BackgroundTasks provide some features, in particular, you can run several of them (including in dependencies). And in them you can use the resources obtained in the dependencies, which will be cleaned only when all tasks are completed, while in case of exceptions it will be possible to handle them correctly. This can be seen more clearly in this diagram.

下面是一个执行最小任务跟踪的示例.假定应用程序正在运行的一个实例.

Below is an example that performs minimal task tracking. One instance of the application running is assumed.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

更强大的解决方案

以上所有示例都很简单,但是如果您需要更强大的系统来进行大量分布式计算,则可以将消息代理 RabbitMQ Kafka NATS 等.以及使用它们的库,例如Celery.

More powerful solutions

All of the above examples were pretty simple, but if you need some more powerful system for heavy distributed computing, then you can look aside message brokers RabbitMQ, Kafka, NATS and etc. And libraries using themthem like Celery.

这篇关于如何在FastAPI中进行多重处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆