在 Python 3 中使用 asyncio 和 websockets 的长时间延迟 [英] Long delay in using asyncio and websockets in Python 3

查看:41
本文介绍了在 Python 3 中使用 asyncio 和 websockets 的长时间延迟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在处理从 websocket 服务器推送到我的 client.py 的数据时,我遇到了很长的(3 小时)延迟(延迟起初很短暂,然后在一天中变得更长).我知道不是服务器延迟的.

I am experiencing a long (3-hour) delay ( the delay is brief at first and then gets longer throughout the day) in processing data pushed from a websocket server to my client.py. I know that it is not delayed by the server.

例如,我每 5 秒看到一次 keep_alive 日志事件及其各自的时间戳.这样就顺利运行了.但是当我看到在日志中处理的数据帧实际上是在服务器发送它之后 3 小时.我是否正在做一些事情来延迟这个过程?

For example every 5 seconds I see the keep_alive log-event and its respective timestamp. So that runs smoothly. But when I see a data frame processed in logs is actually 3 hours after when the server sent it. Am I doing something to delay this process?

我是否正确地调用了我的协程keep_alive"?keep_alive 只是发送给服务器的消息,以保持连接处于活动状态.服务器回显消息.我也记录太多了吗?这可能会延迟处理(我不这么认为,因为我看到记录事件立即发生).

Am I calling my coroutine 'keep_alive' correctly? keep_alive is just a message to the server to keep the connection alive. The server echos the message back. Also am I logging too much? Could that be delaying the processing (I don't think so since I'm seeing the logging events occur right away).

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

文档 - 我认为这可能与它有关 - 但我没有把这些点联系起来.

From the documentation - I'm thinking that this might have something to do with it - but I haven't connected the dots.

max_queue 参数设置队列的最大长度保存传入的消息.默认值为 32.0 禁用限制.收到消息后,将其添加到内存队列中;然后 recv() 从该队列中弹出.为了防止内存过大当消息接收速度快于它们时的消耗处理后,队列必须是有界的.如果队列已满,则协议停止处理传入数据,直到调用 recv().在在这种情况下,各种接收缓冲区(至少在 asyncio 和操作系统)将填满,然后 TCP 接收窗口将缩小,减慢向下传输,避免丢包.

The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.

编辑 2018 年 9 月 28 日:我正在测试它没有保持活动消息,这似乎不是问题.它可能与 convert_and_store() 函数有关吗?这是否需要异步定义然后等待?

EDIT 9/28/2018: I'm testing it without the keep-alive message and that doesn't seem to be the issue. Could it be related to the convert_and_store() function? Does this need to be async def and then awaited as well?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)

EDIT 10/1/2018:keep-alive 消息和 convert_and_store 似乎都有问题;如果我将 keep-alive 消息延长到 60 秒 - 那么 convert_and_store 将每 60 秒只运行一次.所以 convert_and_store 正在等待 keep_alive()...

EDIT 10/1/2018: It seems that both the keep-alive message and convert_and_store are both at issue; if I extend the keep-alive message to 60 seconds - then convert_and_store will run only once per 60 seconds. So convert_and_store is waiting on the keep_alive()...

推荐答案

会不会跟convert_and_store()函数有关?

Could it be related to the convert_and_store() function?

是的,可能是.不应直接调用阻塞代码.如果一个函数执行 CPU 密集型计算 1 秒,所有 asyncio 任务和 IO 操作都会延迟 1 秒.

Yes, it could be. Blocking code should not be called directly. If a function performs a CPU-intensive calculation for 1 second, all asyncio Tasks and IO operations would be delayed by 1 second.

执行器可用于在不同的线程/进程中运行阻塞代码:

An executor can be used to run a blocking code in a different thread/process:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

在你的情况下,它应该是这样的:

In your case it should look something like this:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)

已编辑:

keep-alive 消息和 convert_and_store 似乎都有问题

It seems that both the keep-alive message and convert_and_store are both at issue

你可以在后台运行keep_alive:

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...

这篇关于在 Python 3 中使用 asyncio 和 websockets 的长时间延迟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆