Websockets-使用FastAPI和两个Websocket流音频输入,json输出 [英] Websockets - Stream audio in, json out w/ FastAPI and two websockets

查看:224
本文介绍了Websockets-使用FastAPI和两个Websocket流音频输入,json输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是消耗音频流.从逻辑上讲,这是我的目标:

My objective is to consume an audio stream. Logically, this is my objective:

  1. 音频流通过WebSocket A( FastAPI 端点)
  2. 音频流桥接到另一个WebSocket B,它将返回JSON(
  1. Audio stream comes through WebSocket A (FastAPI endpoint)
  2. Audio stream is bridged to a different WebSocket, B, which will return a JSON (Rev-ai's WebSocket)
  3. Json results are sent back through WebSocket A, in real-time. Thus, while the audio stream is still coming in.

可能的解决方案

为了解决这个问题,我有很多想法,但是最终我一直试图将 WebSocket A 桥接到 WebSocket B .到目前为止,我的尝试涉及一个 ConnectionManager 类,该类包含一个 Queue.queue .音频流的大块添加到此队列中,这样我们就不会直接从 WebSocket A 消费.

Possible solution

To solve this problem, I've had quite a few ideas, but ultimately I've been trying to bridge WebSocket A to WebSocket B. My attempt so far involves a ConnectionManager class, which contains a Queue.queue. The chunks of the audio stream are added to this queue so that we do not consume directly from WebSocket A.

ConnectionManager 还包含一个生成器方法,用于从队列中产生所有值.

The ConnectionManager also contains a generator method to yield all values from the queue.

我的FastAPI实现从 websocket A 消耗如下:

My FastAPI implementation consumes from websocket A like this:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

与此同时,我想执行一个任务,将音频流桥接到 WebSocket B ,并将获得的值发送到 WebSocket A .可以通过上述 generator 方法消耗音频流.

Concurrent to this ingestion, I'd like to have a task that would bridge our audio stream to WebSocket B, and send the obtained values to WebSocket A. The audio stream could be consumed through the aforementioned generator method.

由于WebSocket B消费消息的方式,生成器方法是必需的,如Rev-ai的

The generator method is necessary due to how WebSocket B consumes messages, as shown in Rev-ai's examples:

streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

这是最大的挑战之一,因为我们需要将数据消费到生成器中并实时获取结果.

This is one of the biggest challenges, as we need to be consuming data into a generator and getting the results in real-time.

我一直在尝试 asyncio ;据我了解,一种可能性是创建一个在后台运行的协程.我一直没有成功,但这听起来很有希望.

I've been trying my luck with asyncio; from what i'm understanding, a possibility would be to create a coroutine that would run in the background. I've been unsuccessful with this, but it sounded promising.

我曾考虑过通过 FastAPI 启动事件来触发此操作,但是我在实现并发方面遇到了麻烦.我尝试使用 event_loops ,但它给了我嵌套事件循环相关的错误.

I've thought about triggering this through the FastAPI startup event, but I'm having trouble achieving concurrency. I tried to use event_loops, but it gave me a nested event loop related error.

如果您的见解认为,FastAPI可以是可选的,WebSocket A也可以是某种方式.最终,最终目的是通过我们自己的API端点接收音频流,并通过Rev.ai的WebSocket,进行一些额外的处理,然后将结果发送回去.

FastAPI can be optional if your insight deems so, and in a way so is WebSocket A. At the end of the day, the ultimate objective is to receive an audio stream through our own API endpoint, run it through Rev.ai's WebSocket, do some extra processing, and send the results back.

推荐答案

用于websocket的桥<->websocket

以下是webscoket代理的简单示例,其中websocket A 和websocket B 都是FastAPI应用程序中的端点,而websocket B 可以位于其他位置,只需更改其地址 ws_b_uri .对于websocket客户端,使用 websockets 库.

Bridge for websocket <-> websocket

Below is a simple example of webscoket proxy, where websocket A and websocket B are both endpoints in the FastAPI app, but websocket B can be located in something else, just change its address ws_b_uri. For websocket client websockets library is used.

要执行数据转发, A 端点的代码启动两个任务 forward reverse ,并通过等待其完成href ="https://docs.python.org/3/library/asyncio-task.html#asyncio.gather" rel ="nofollow noreferrer"> asyncio.gather() .双向数据传输是并行进行的.

To perform data forwarding, the code of A endpoint starts two tasks forward and reverse and waits for their completion by means of asyncio.gather(). Data transfer for both directions occurs in a parallel manner.

import asyncio

from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()

ws_b_uri = "ws://localhost:8001/ws_b"


async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await ws_b.send(data)


async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_b.recv()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
        fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
        rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
        await asyncio.gather(fwd_task, rev_task)


@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
        data = await ws_b_server.receive_bytes()
        print("websocket B server recieved: ", data)
        await ws_b_server.send_text('{"response": "value from B server"}')

更新(Bridge websocket<->同步生成器)

考虑问题的最后更新,问题在于WebSocket B 隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)实际上,任务变成了如何在WebSocket和同步生成器之间建立桥梁.而且由于我从未使用过 rev-ai 库,所以我为 streamclient.start 创建了一个存根函数 stream_client_start ,该函数需要一个生成器( MEDIA_GENERATOR (原始),并返回一个生成器(原始 response_generator ).

Update (Bridge websocket <-> sync generator)

Considering the last update of the question, the issue is that WebSocket B is hidden behind a synchronous generator (in fact there are two of them, one for the input and the other for the output) and in fact, the task turns into how to make a bridge between the WebSocket and the synchronous generator. And since I never worked with the rev-ai library, I made a stub function stream_client_start for streamclient.start that takes a generator (MEDIA_GENERATOR in original) and returns a generator (response_generator in original).

在这种情况下,我通过 run_in_executor 在单独的线程中开始生成器的处理,并且为了不重新发明轮子,为了通信,我使用了

In this case, I start the processing of generators in a separate thread through the run_in_executor, and in order not to reinvent the wheel, for communication I use a queue from the janus library, which allows you to bind synchronous and asynchronous code through a queue. Accordingly, there are also two queues, one for A -> B, the other for B -> A.


import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue

app = FastAPI()


# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
    for chunk in input_gen:
        time.sleep(1)
        yield f"Get {chunk}"


# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
    while True:
        yield sync_queue.get()


async def forward(ws_a: WebSocket, queue_b):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await queue_b.put(data)


async def reverse(ws_a: WebSocket, queue_b):
    while True:
        data = await queue_b.get()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


def process_b_client(fwd_queue, rev_queue):
    response_generator = stream_client_start(queue_to_generator(fwd_queue))
    for r in response_generator:
        rev_queue.put(r)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    loop = asyncio.get_event_loop()
    fwd_queue = janus.Queue()
    rev_queue = janus.Queue()
    await ws_a.accept()

    process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
    fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
    rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
    await asyncio.gather(process_client_task, fwd_task, rev_task)

这篇关于Websockets-使用FastAPI和两个Websocket流音频输入,json输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆