使用Asyncio实现非阻塞WebSocket接收 [英] Non-Blocking Websocket Receive with Asyncio

查看:25
本文介绍了使用Asyncio实现非阻塞WebSocket接收的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个Python-3程序,该程序尝试做两件事: (1)从外部WebSocket读取数据(类型1)(非阻塞)和 (2)在常规UDP套接字(非阻塞)上接收数据(类型2)

在很长一段时间内,WebSocket和UDP套接字上都没有数据。因此,我正在尝试使这两种数据类型的读取/接收都是非阻塞的。我正在尝试使用Asyncio和WebSockets对WebSocket执行此操作。

不幸的是,只要WebSocket上没有数据(类型1),下面的代码就会挂起。它会阻止代码的睡觉执行。我做错了什么?

提前感谢您的所有帮助。

import asyncio
import websockets
import socket

IP_STRATUX =    "ws://192.168.86.201/traffic"

# Method to retrieve data (Type 1) from websocket
async def readWsStratux(inURL):
    async with websockets.connect(inURL, close_timeout=0.1) as websocket:
        try:
            data = await websocket.recv()
            return data
        except websocket.error:
            return None

if __name__ == "__main__":
    # Socket to receive data (Type 2)
    sockPCC = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sockPCC.bind((PCC_IP, PCC_PORT))
    sockPCC.setblocking(0)

    while True:
        print('-----MAIN LOOP-----')
        data1 = asyncio.get_event_loop().run_until_complete(
            readWsStratux(IP_STRATUX))
        print(f'Data 1: {data1}')
        data2, addr = sockPCC.recvfrom(1024)
        print(f'Data 2: {data2}')

推荐答案

问题在于run_until_complete言行一致,即运行提供的协程,直到它返回。您需要创建一个协同程序,该协同程序生成两个独立的任务,每个任务都在"后台"运行自己的协同程序。一个任务将处理从WebSocket读取数据,另一个任务将处理UDP数据。这两个协同例程都可以馈送您的主协同例程从中读取的队列。

WebSocket协程看起来与您已有的非常相似,但是无限循环被推入协程,数据传输到调用者提供的队列中:

async def readWsStratux(inURL, queue):
    while True:
        async with websockets.connect(inURL, close_timeout=0.1) as ws:
            try:
                data = await ws.recv()
                await queue.put(('websocket', data))
            except websockets.error:
                return None

下一步,您将需要一个类似的执行UDP的协程。与其手动创建非阻塞套接字,不如使用异步的support for UDP。您可以从文档中的example class的简化版本开始:

class ClientProtocol:
    def __init__(self, queue):
        self.queue = queue

    def datagram_received(self, data, addr):
        self.queue.put_nowait(('udp', data))

    def connection_lost(self, exc):
        self.queue.put_nowait(('udp', b''))

async def read_udp(queue):
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: ClientProtocol(queue),
        remote_addr=(PCC_IP, PCC_PORT))
    # wait until canceled
    try:
        await asyncio.get_event_loop().create_future()
    except asyncio.CancelledError:
        transport.close()
        raise

准备好这两个之后,您可以编写派生它们的主协程,并在它们运行时从队列中收集数据:

async def read_both(in_url):
    queue = asyncio.Queue()
    # spawn two workers in parallel, and have them send
    # data to our queue
    ws_task = asyncio.create_task(readWsStratux(in_url, queue))
    udp_task = asyncio.create_task(read_udp(queue))

    while True:
        source, data = await queue.get()
        if source == 'ws':
            print('from websocket', data)
        elif source == 'udp':
            if data == b'':
                break  # lost the UDP connection
            print('from UDP', data)

    # terminate the workers
    ws_task.cancel()
    udp_task.cancel()

您的主程序现在由read_both

的简单调用组成
if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(read_both(IP_STRATUX))

请注意,以上代码未经测试,可能包含打字错误。

这篇关于使用Asyncio实现非阻塞WebSocket接收的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆