是否可以检测到所有异步任务何时暂停? [英] Is it possible to detect when all async tasks are suspended?
问题描述
我正在尝试测试异步代码,但由于某些任务之间的复杂连接,我遇到了麻烦.
I'm trying to test an async code, but I'm having trouble because of the complex connection between some tasks.
我需要的上下文是一些代码,它在读取文件的同时读取另一个进程正在写入的文件.代码中有一些逻辑,其中读取截断的记录将使其退回并 wait()
在 asyncio.Condition
上稍后由 inotify 事件.当另一个进程完成以后的写入时,此代码应该通过重新读取记录来使其恢复.我特别想测试这个恢复是否有效.
The context I need this is some code which reads a file in parallel to it being written by another process. There's some logic in the code where reading a truncated record will make it back off and wait()
on an asyncio.Condition
to be later released by an inotify event. This code should let it recover by re-reading the record when a future write has been completed by another process. I specifically want to test that this recovery works.
所以我的计划是:
- 编写部分文件
- 运行事件循环,直到它根据条件挂起
- 编写文件的其余部分
- 运行事件循环直到完成
我以为这是答案:检测空闲的异步事件循环
然而,试用测试表明它退出得太早了:
However a trial test shows that it exits too soon:
import asyncio
import random
def test_ping_pong():
async def ping_pong(idx: int, oth_idx: int):
for i in range(random.randint(100, 1000)):
counters[idx] += 1
async with conditions[oth_idx]:
conditions[oth_idx].notify()
async with conditions[idx]:
await conditions[idx].wait()
async def detect_iowait():
loop = asyncio.get_event_loop()
rsock, wsock = socket.socketpair()
wsock.close()
try:
await loop.sock_recv(rsock, 1)
finally:
rsock.close()
conditions = [asyncio.Condition(), asyncio.Condition()]
counters = [0, 0]
loop = asyncio.get_event_loop()
loop.create_task(ping_pong(0, 1))
loop.create_task(ping_pong(1, 0))
loop.run_until_complete(loop.create_task(detect_iowait()))
assert counters[0] > 10
assert counters[1] > 10
推荐答案
在挖掘 Python 事件循环的源代码后,我发现没有任何公开的内容可以公开做到这一点.
After digging through the source code for python's event loops, I've found nothing exposed that can do this publicly.
然而,可以使用由 BaseEventLoop
创建的 _ready
双端队列.请参阅此处.这包含可以立即运行的每个任务.当任务运行时,它会从 _ready
双端队列中弹出.当一个挂起的任务被另一个任务释放时(例如通过调用 future.set_result()) 挂起的任务会立即添加回双端队列.这从 python 3.5 开始就存在了.
It is however possible to use the _ready
deque created by the BaseEventLoop
. See here. This contains every task that is immediately ready to run. When a task is run it is popped from the _ready
deque. When a suspended task is released by another task (eg by calling future.set_result()) the suspended task is immediately added back to the deque. This has existed since python 3.5.
您可以做的一件事是重复注入回调以检查 _ready
中有多少项.当所有其他任务被挂起时,回调运行时dqueue中将没有任何东西.
One thing that you can do is repeatedly inject a callback to check how many items in _ready
. When all other tasks are suspended, there will be nothing left in the dqueue at the moment the callback runs.
回调将在事件循环的每次迭代中最多运行一次:
The callback will run at most once per iteration of the event loop:
async def wait_for_deadlock(empty_loop_threshold: int = 0):
def check_for_deadlock():
nonlocal empty_loop_count
# pylint: disable=protected-access
if loop._ready:
empty_loop_count = 0
loop.call_soon(check_for_deadlock)
elif empty_loop_count < empty_loop_threshold:
empty_loop_count += 1
loop.call_soon(check_for_deadlock)
else:
future.set_result(None)
empty_loop_count = 0
loop = asyncio.get_running_loop()
future = loop.create_future()
asyncio.get_running_loop().call_soon(check_for_deadlock)
await future
在上面的代码中,empty_loop_threshold
在大多数情况下并不是真正必要的,但在任务与 IO 通信的情况下存在.例如,如果一个任务通过 IO 与另一个任务进行通信,则可能会出现所有任务暂停的时刻,即使一个任务已准备好读取数据.设置 empty_loop_threshold = 1
应该可以解决这个问题.
In the above code the empty_loop_threshold
is not really necessary in most cases but exists for cases where tasks communicate with IO. For example if one task communicates to another through IO, there may be a moment where all tasks are suspended even through one has data ready to read. Setting empty_loop_threshold = 1
should get round this.
使用这个比较简单.您可以:
Using this is relatively simple. You can:
loop.run_until_complete(wait_for_deadlock())
或者按照我的问题要求:
Or as requested in my question:
def some_test():
async def async_test():
await wait_for_deadlock()
inject_something()
await wait_for_deadlock()
loop = loop.get_event_loop()
loop.create_task(task_to_test())
loop.run_until_complete(loop.create_task(async_test)
assert something
这篇关于是否可以检测到所有异步任务何时暂停?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!