FastAPI 中音频流的 Websockets 桥接器 [英] Websockets bridge for audio stream in FastAPI

查看:38
本文介绍了FastAPI 中音频流的 Websockets 桥接器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目标

我的目标是使用音频流.从逻辑上讲,这是我的目标:

  1. 音频流来自 WebSocket A(FastAPI 端点)
  2. 音频流被桥接到不同的 WebSocket B,它将返回一个 JSON (Rev-ai 的 WebSocket)
  3. Json 结果通过 WebSocket A 实时发回.因此,当音频流仍在传入时.

可能的解决方案

为了解决这个问题,我有很多想法,但最终我一直在尝试将 WebSocket A 桥接到 WebSocket B.到目前为止,我的尝试涉及一个 ConnectionManager 类,其中包含一个 Queue.queue.音频流的块被添加到这个队列中,这样我们就不会直接从 WebSocket A 消费.

ConnectionManager 还包含一个生成器方法,用于从队列中生成所有值.

我的 FastAPI 实现像这样从 websocket A 消费:

@app.websocket("/ws")async def predict_feature(websocket: WebSocket):等待 manager.connect(websocket)尝试:为真:块 = 等待 websocket.receive_bytes()manager.add_to_buffer(块)除了键盘中断:manager.disconnect()

在此摄取的同时,我希望有一项任务将我们的音频流桥接到 WebSocket B,并将获得的值发送到 WebSocket A.音频流可以通过前面提到的 generator 方法使用.

由于WebSocket B如何消费消息,生成器方法是必要的,如Rev-ai的示例:

streamclient = RevAiStreamingClient(access_token, config)response_generator = streamclient.start(MEDIA_GENERATOR)对于 response_generator 中的响应:# 通过 websocket A 返回这个值打印(响应)

这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获取结果.

最新尝试

我一直在用 asyncio 试试运气;根据我的理解,一种可能性是创建一个在后台运行的协程.我一直没有成功,但听起来很有希望.

我曾考虑通过 FastAPI 启动事件来触发它,但是我在实现并发性方面遇到了麻烦.我尝试使用 event_loops,但它给了我一个 nested event loop 相关错误.

警告

如果您的见解认为如此,FastAPI 可以是可选的,在某种程度上,WebSocket A 也是如此.在一天结束时,最终目标是通过我们自己的 API 端点接收音频流,并通过 Rev.ai 运行它WebSocket,做一些额外的处理,并将结果发回.

解决方案

Bridge for websocket <->网络套接字

下面是一个简单的 webscoket 代理示例,其中 websocket A 和 websocket B 都是 FastAPI 应用中的端点,但是 websocket B可以位于其他地方,只需更改其地址 ws_b_uri.对于 websocket 客户端,使用 websockets 库.

为了进行数据转发,A端点的代码启动两个任务forwardreverse,并通过asyncio.gather().两个方向的数据传输以并行方式进行.

导入异步从 fastapi 导入 FastAPI从 fastapi 导入 WebSocket导入网络套接字app = FastAPI()ws_b_uri = "ws://localhost:8001/ws_b";async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):为真:数据 = 等待 ws_a.receive_bytes()打印(websocket A 收到:",数据)等待 ws_b.send(data)async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):为真:数据 = 等待 ws_b.recv()等待 ws_a.send_text(data)打印(websocket A 发送:",数据)@app.websocket("/ws_a")async def websocket_a(ws_a: WebSocket):等待 ws_a.accept()与 websockets.connect(ws_b_uri) 作为 ws_b_client 异步:fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))rev_task = asyncio.create_task(反向(ws_a,ws_b_client))等待 asyncio.gather(fwd_task, rev_task)@app.websocket("/ws_b")async def websocket_b(ws_b_server: WebSocket):等待 ws_b_server.accept()为真:数据 = 等待 ws_b_server.receive_bytes()打印(websocket B 服务器收到:",数据)await ws_b_server.send_text('{响应":来自 B 服务器的值"}')

更新(Bridge websocket <-> 同步生成器)

考虑到问题的最后更新,问题是WebSocket B 隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)实际上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁.由于我从未使用过 rev-ai 库,我为 streamclient.start 制作了一个存根函数 stream_client_start,它接受一个生成器(MEDIA_GENERATOR 原始)并返回一个生成器(response_generator 原始).

在这种情况下,我通过 run_in_executor 在一个单独的线程中开始处理生成器,为了不重新发明轮子,我使用 janus 库,允许你通过队列绑定同步和异步代码.因此,也有两个队列,一个用于 A ->B,另一个为B ->A.

<预><代码>导入异步导入时间从输入导入生成器从 fastapi 导入 FastAPI从 fastapi 导入 WebSocket进口 janus导入队列app = FastAPI()# Stub 生成器函数(在内部使用 websocket B)def stream_client_start(input_gen: Generator) ->发电机:对于 input_gen 中的块:时间.睡眠(1)yield fGet {chunk}"# 排队到生成器辅助适配器def queue_to_generator(sync_queue: queue.Queue) ->发电机:为真:产量 sync_queue.get()async def forward(ws_a: WebSocket, queue_b):为真:数据 = 等待 ws_a.receive_bytes()打印(websocket A 收到:",数据)等待 queue_b.put(data)async def reverse(ws_a: WebSocket, queue_b):为真:数据 = 等待 queue_b.get()等待 ws_a.send_text(data)打印(websocket A 发送:",数据)def process_b_client(fwd_queue, rev_queue):response_generator = stream_client_start(queue_to_generator(fwd_queue))对于 response_generator 中的 r:rev_queue.put(r)@app.websocket("/ws_a")async def websocket_a(ws_a: WebSocket):loop = asyncio.get_event_loop()fwd_queue = janus.Queue()rev_queue = janus.Queue()等待 ws_a.accept()process_client_task = loop.run_in_executor(无,process_b_client,fwd_queue.sync_q,rev_queue.sync_q)fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))rev_task = asyncio.create_task(反向(ws_a,rev_queue.async_q))等待 asyncio.gather(process_client_task, fwd_task, rev_task)

Objective

My objective is to consume an audio stream. Logically, this is my objective:

  1. Audio stream comes through WebSocket A (FastAPI endpoint)
  2. Audio stream is bridged to a different WebSocket, B, which will return a JSON (Rev-ai's WebSocket)
  3. Json results are sent back through WebSocket A, in real-time. Thus, while the audio stream is still coming in.

Possible solution

To solve this problem, I've had quite a few ideas, but ultimately I've been trying to bridge WebSocket A to WebSocket B. My attempt so far involves a ConnectionManager class, which contains a Queue.queue. The chunks of the audio stream are added to this queue so that we do not consume directly from WebSocket A.

The ConnectionManager also contains a generator method to yield all values from the queue.

My FastAPI implementation consumes from websocket A like this:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

Concurrent to this ingestion, I'd like to have a task that would bridge our audio stream to WebSocket B, and send the obtained values to WebSocket A. The audio stream could be consumed through the aforementioned generator method.

The generator method is necessary due to how WebSocket B consumes messages, as shown in Rev-ai's examples:

streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

This is one of the biggest challenges, as we need to be consuming data into a generator and getting the results in real-time.

Latest attempts

I've been trying my luck with asyncio; from what i'm understanding, a possibility would be to create a coroutine that would run in the background. I've been unsuccessful with this, but it sounded promising.

I've thought about triggering this through the FastAPI startup event, but I'm having trouble achieving concurrency. I tried to use event_loops, but it gave me a nested event loop related error.

Caveat

FastAPI can be optional if your insight deems so, and in a way so is WebSocket A. At the end of the day, the ultimate objective is to receive an audio stream through our own API endpoint, run it through Rev.ai's WebSocket, do some extra processing, and send the results back.

解决方案

Bridge for websocket <-> websocket

Below is a simple example of webscoket proxy, where websocket A and websocket B are both endpoints in the FastAPI app, but websocket B can be located in something else, just change its address ws_b_uri. For websocket client websockets library is used.

To perform data forwarding, the code of A endpoint starts two tasks forward and reverse and waits for their completion by means of asyncio.gather(). Data transfer for both directions occurs in a parallel manner.

import asyncio

from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()

ws_b_uri = "ws://localhost:8001/ws_b"


async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await ws_b.send(data)


async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_b.recv()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
        fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
        rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
        await asyncio.gather(fwd_task, rev_task)


@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
        data = await ws_b_server.receive_bytes()
        print("websocket B server recieved: ", data)
        await ws_b_server.send_text('{"response": "value from B server"}')

Update (Bridge websocket <-> sync generator)

Considering the last update of the question, the issue is that WebSocket B is hidden behind a synchronous generator (in fact there are two of them, one for the input and the other for the output) and in fact, the task turns into how to make a bridge between the WebSocket and the synchronous generator. And since I never worked with the rev-ai library, I made a stub function stream_client_start for streamclient.start that takes a generator (MEDIA_GENERATOR in original) and returns a generator (response_generator in original).

In this case, I start the processing of generators in a separate thread through the run_in_executor, and in order not to reinvent the wheel, for communication I use a queue from the janus library, which allows you to bind synchronous and asynchronous code through a queue. Accordingly, there are also two queues, one for A -> B, the other for B -> A.


import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue

app = FastAPI()


# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
    for chunk in input_gen:
        time.sleep(1)
        yield f"Get {chunk}"


# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
    while True:
        yield sync_queue.get()


async def forward(ws_a: WebSocket, queue_b):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await queue_b.put(data)


async def reverse(ws_a: WebSocket, queue_b):
    while True:
        data = await queue_b.get()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


def process_b_client(fwd_queue, rev_queue):
    response_generator = stream_client_start(queue_to_generator(fwd_queue))
    for r in response_generator:
        rev_queue.put(r)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    loop = asyncio.get_event_loop()
    fwd_queue = janus.Queue()
    rev_queue = janus.Queue()
    await ws_a.accept()

    process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
    fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
    rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
    await asyncio.gather(process_client_task, fwd_task, rev_task)

这篇关于FastAPI 中音频流的 Websockets 桥接器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆