使 FastAPI WebSockets 的 CPU 绑定任务异步 [英] Make an CPU-bound task asynchronous for FastAPI WebSockets

查看:39
本文介绍了使 FastAPI WebSockets 的 CPU 绑定任务异步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我有一个受 CPU 限制的长时间运行算法,我们称之为任务.假设它看起来像这样:

so I have a CPU-bound long-running algorithm, let's call it task. Let's say it looks like this:

def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        result += do_things()
  return result

@app.get('/')
def results(parameters: BodyModel):
    return task(parameters)

如果我将其封装在 def 路径操作函数 中,则一切正常,因为它是在不同的线程中启动的.我可以访问多个路径等.并发通过将我的 CPU 绑定任务推送到一个单独的线程来完成它的工作.但我现在想切换到 WebSockets,以传达中间结果.为此,我必须将我的整个事情标记为异步并将 WebSocket 传递到我的任务中.所以它看起来像这样:

If I encapsulate that in a def path operation function everything works fine as it is started in a different thread. I can access multiple paths etc. concurrency is doing its job by pushing my CPU-bound task to a separate thread. But I want to switch to WebSockets now, to communicate intermediate results. For that to work, I have to mark my whole thing as asynchronous and pass the WebSocket into my task. So it looks like this:

async def task(parameters):
  result = 0
  for _ in range(10):
    for _ in range(10):
      for _ in range(10):
        intermediate_result = do_things()
        await parameters.websocket.send_text(intermediate_result)
        result += intermediate_result
  return result

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        parameters = await websocket.receive_text()
        parameters.websocket = websocket
        result = await task(parameters)
        await websocket.send_text(result)

发送中间结果就像一种魅力.但是现在我的算法阻止了 FastAPI,因为它本身并不是真正的异步.一旦我向/ws"发布消息,FastAPI 就会被阻止,并且在我的任务完成之前不会响应任何其他请求.

It works like a charm to send the intermediate results. BUT now my algorithm blocks FastAPI as it is not truly asynchronous by itself. Once I post a message to '/ws' FastAPI is blocked and does not respond to any other requests until my task is finished.

所以我需要一些关于如何操作的建议

So I need some advice on how to

  • a) 从同步 CPU 绑定任务中发送 WebSocket 消息(我没有找到同步 send_text 替代方案),因此我可以使用 def
  • b) 如何使我的 CPU 绑定真正异步,以便在我使用 async def 时不再阻塞任何东西.
  • a) either send WebSocket messages from within a synchronous CPU-bound task (I didn't find a synchronous send_text alternative) so I can use def or
  • b) how to make my CPU-bound truly asynchronous so that it does not block anything anymore when I use async def.

我尝试使用 此处 中所述的 ProcessPoolExecuter,但无法腌制协程,据我所知必须使我的任务成为协程(使用异步)才能使用其中的 websocket.send_text().

I tried using the ProcessPoolExecuter as described here but it's not possible to pickle a coroutine and as far as I understand I have to make my task a coroutine (using async) to use the websocket.send_text() within it.

另外,我想过将我的中间结果存储在某个地方,创建一个 HTTP POST 来启动我的任务,然后使用另一个 WebSocket 连接来读取和发送中间结果.但随后我也可以类似地启动后台任务并实现常规 HTTP 轮询机制.但我也不想要,主要是因为我计划使用 Google Cloud Run,它会在所有连接关闭时限制 CPU.而且我认为教我的任务如何直接通过 WebSocket 进行通信是更好的做法.

Also, I thought about just storing my intermediate results somewhere, make an HTTP POST to start my task, and then have another WebSocket connection to read and send the intermediate results. But then I could also similarly start a background task and implement a regular HTTP polling mechanism. But I don't want either, mainly because I plan to use Google Cloud Run which limits the CPU when all connections are closed. And I think it's better practice to teach my task how to communicate via WebSocket directly.

我希望我的问题很清楚.这是我第一个使用 FastAPI 和异步性的大型项目,之前没有真正使用过 AsyncIO.所以我可能只是错过了一些东西.感谢您的建议.

I hope my question is clear. It's my first larger-scale project with FastAPI and asynchronicity and haven't really used AsyncIO before. So I might have just missed something. Thx for your suggestions.

推荐答案

如果有人遇到这个问题,我现在会添加适合我的解决方案.

In case someone comes across this, I'll add the solution that works for me now.

我关注的是:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

关键是要让它无阻塞.因此,例如,而不是:

The key is to make it non-blocking. So for example, instead of:

 # 1. Run in the default loop's executor:
result = await loop.run_in_executor(None, blocking_io)
print('default thread pool', result)

我移动等待并将代码更改为:

I move the await and change the code to:

# 1. Run in the default loop's executor:
thread = loop.run_in_executor(None, blocking_io)
print('default thread pool', result)
while True:
    asyncio.sleep(1)
    websocket.send_text('status updates...'
    if internal_logger.blocking_stuff_finished:
        break
result = await thread
websocket.send_text('result:', result)
websocket.close()

通过这种方式,我将 cpu_bound 的东西放在一个单独的线程中,我不需要等待,并且一切正常.

This way I have my cpu_bound stuff in a separate thread that I'm not awaiting for and everything works fine.

创建自定义线程池也可以,但我们需要移除上下文管理器以使其成为非阻塞的.

Making a custom thread pool also works, but we would need to remove the context manager to make it non-blocking.

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
    result = await loop.run_in_executor(pool, blocking_io)

然后会变成:

pool = concurrent.futures.ThreadPoolExecutor()
thread = loop.run_in_executor(pool, blocking_io)

理论上,同样适用于 ProcessPoolExecutor,但需要做更多的工作,因为没有共享内存,并且我的终止条件不会如上所述那样工作.

In theory, the same would work for the ProcessPoolExecutor, but more work needs to be done as there is no shared memory and my termination condition wouldn't work as described above.

是的,我知道 cpu_bound 的东西最好在不同的进程中完成,但将它移到一个单独的线程在我的情况下确实有效,我确实喜欢共享内存 atm.

And yes, I know that cpu_bound stuff should preferably be done in a different process, but moving it to a separate thread does work in my case and I do enjoy the shared memory atm.

这篇关于使 FastAPI WebSockets 的 CPU 绑定任务异步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆