是否可以检测到所有异步任务何时暂停? [英] Is it possible to detect when all async tasks are suspended?

查看:28
本文介绍了是否可以检测到所有异步任务何时暂停?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试测试异步代码,但由于某些任务之间的复杂连接,我遇到了麻烦.

I'm trying to test an async code, but I'm having trouble because of the complex connection between some tasks.

我需要的上下文是一些代码,它在读取文件的同时读取另一个进程正在写入的文件.代码中有一些逻辑,其中读取截断的记录将使其退回并 wait()asyncio.Condition 上稍后由 inotify 事件.当另一个进程完成以后的写入时,此代码应该通过重新读取记录来使其恢复.我特别想测试这个恢复是否有效.

The context I need this is some code which reads a file in parallel to it being written by another process. There's some logic in the code where reading a truncated record will make it back off and wait() on an asyncio.Condition to be later released by an inotify event. This code should let it recover by re-reading the record when a future write has been completed by another process. I specifically want to test that this recovery works.

所以我的计划是:

  • 编写部分文件
  • 运行事件循环,直到它根据条件挂起
  • 编写文件的其余部分
  • 运行事件循环直到完成

我以为这是答案:检测空闲的异步事件循环

然而,试用测试表明它退出得太早了:

However a trial test shows that it exits too soon:

import asyncio
import random


def test_ping_pong():
    async def ping_pong(idx: int, oth_idx: int):
        for i in range(random.randint(100, 1000)):
            counters[idx] += 1
            async with conditions[oth_idx]:
                conditions[oth_idx].notify()
            async with conditions[idx]:
                await conditions[idx].wait()

    async def detect_iowait():
        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        wsock.close()
        try:
            await loop.sock_recv(rsock, 1)
        finally:
            rsock.close()
    conditions = [asyncio.Condition(), asyncio.Condition()]
    counters = [0, 0]


    loop = asyncio.get_event_loop()
    loop.create_task(ping_pong(0, 1))
    loop.create_task(ping_pong(1, 0))
    loop.run_until_complete(loop.create_task(detect_iowait()))

    assert counters[0] > 10
    assert counters[1] > 10

推荐答案

在挖掘 Python 事件循环的源代码后,我发现没有任何公开的内容可以公开做到这一点.

After digging through the source code for python's event loops, I've found nothing exposed that can do this publicly.

然而,可以使用由 BaseEventLoop 创建的 _ready 双端队列.请参阅此处.这包含可以立即运行的每个任务.当任务运行时,它会从 _ready 双端队列中弹出.当一个挂起的任务被另一个任务释放时(例如通过调用 future.set_result()) 挂起的任务会立即添加回双端队列.这从 python 3.5 开始就存在了.

It is however possible to use the _ready deque created by the BaseEventLoop. See here. This contains every task that is immediately ready to run. When a task is run it is popped from the _ready deque. When a suspended task is released by another task (eg by calling future.set_result()) the suspended task is immediately added back to the deque. This has existed since python 3.5.

您可以做的一件事是重复注入回调以检查 _ready 中有多少项.当所有其他任务被挂起时,回调运行时dqueue中将没有任何东西.

One thing that you can do is repeatedly inject a callback to check how many items in _ready. When all other tasks are suspended, there will be nothing left in the dqueue at the moment the callback runs.

回调将在事件循环的每次迭代中最多运行一次:

The callback will run at most once per iteration of the event loop:

async def wait_for_deadlock(empty_loop_threshold: int = 0):
    def check_for_deadlock():
        nonlocal empty_loop_count

        # pylint: disable=protected-access
        if loop._ready:
            empty_loop_count = 0
            loop.call_soon(check_for_deadlock)
        elif empty_loop_count < empty_loop_threshold:
            empty_loop_count += 1
            loop.call_soon(check_for_deadlock)
        else:
            future.set_result(None)

    empty_loop_count = 0
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    asyncio.get_running_loop().call_soon(check_for_deadlock)
    await future

在上面的代码中,empty_loop_threshold 在大多数情况下并不是真正必要的,但在任务与 IO 通信的情况下存在.例如,如果一个任务通过 IO 与另一个任务进行通信,则可能会出现所有任务暂停的时刻,即使一个任务已准备好读取数据.设置 empty_loop_threshold = 1 应该可以解决这个问题.

In the above code the empty_loop_threshold is not really necessary in most cases but exists for cases where tasks communicate with IO. For example if one task communicates to another through IO, there may be a moment where all tasks are suspended even through one has data ready to read. Setting empty_loop_threshold = 1 should get round this.

使用这个比较简单.您可以:

Using this is relatively simple. You can:

loop.run_until_complete(wait_for_deadlock())

或者按照我的问题要求:

Or as requested in my question:

def some_test():
    async def async_test():
        await wait_for_deadlock()
        inject_something()
        await wait_for_deadlock()
            

    loop = loop.get_event_loop()
    loop.create_task(task_to_test())
    loop.run_until_complete(loop.create_task(async_test)
    assert something

这篇关于是否可以检测到所有异步任务何时暂停?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆