包装异步。在超时时聚集 [英] Wrapping asyncio.gather in a timeout

查看:32
本文介绍了包装异步。在超时时聚集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看过asyncio.gather vs asyncio.wait,但不确定这是否解决了这个特定问题。我要做的是用timeout参数将asyncio.gather()协程包装在asyncio.wait_for()中。我还需要满足以下条件:

  • return_exceptions=True(来自asyncio.gather())-我希望在结果中包含异常实例,而不是将异常传播到等待时间gather()的任务
  • 顺序:保留asyncio.gather()的属性,即结果顺序与输入顺序相同。(否则,将输出映射回输入。)asyncio.wait_for()未通过此条件,我不确定实现此条件的理想方法。

超时针对整个整个asyncio.gather()等待对象列表--如果它们陷入超时或返回异常,这两种情况中的任何一种都应该将异常实例放在结果列表中。

考虑此设置:

>>> import asyncio
>>> import random
>>> from time import perf_counter
>>> from typing import Iterable
>>> from pprint import pprint
>>> 
>>> async def coro(i, threshold=0.4):
...     await asyncio.sleep(i)
...     if i > threshold:
...         # For illustration's sake - some coroutines may raise,
...         # and we want to accomodate that and just test for exception
...         # instances in the results of asyncio.gather(return_exceptions=True)
...         raise Exception("i too high")
...     return i
... 
>>> async def main(n, it: Iterable):
...     res = await asyncio.gather(
...         *(coro(i) for i in it),
...         return_exceptions=True
...     )
...     return res
... 
>>> 
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> res = asyncio.run(main(n, it=it))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")  # Expectation: ~1 seconds
Done main(10) in 0.86 seconds
>>> pprint(dict(zip(it, res)))
{0.01323751590501987: 0.01323751590501987,
 0.07422124156714727: 0.07422124156714727,
 0.3088946587429545: 0.3088946587429545,
 0.3113884366691503: 0.3113884366691503,
 0.4419557492849159: Exception('i too high'),
 0.4844375347808497: Exception('i too high'),
 0.5796792804615848: Exception('i too high'),
 0.6338658027451068: Exception('i too high'),
 0.7426396870165088: Exception('i too high'),
 0.8614799253779063: Exception('i too high')}
上面的程序,使用n = 10,执行运行时间为0.5秒,在异步运行时还会有一些开销。(random.random()将均匀分布在[0,1)中。)

假设我要将其作为超时强加给整个操作(即在协程main()上):

timeout = 0.5

现在可以使用asyncio.wait(),但问题是结果是set对象,所以肯定不能保证asyncio.gather()的排序返回值属性:

>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     done, pending = await asyncio.wait(tasks, timeout=timeout)
...     return done, pending
... 
>>> timeout = 0.5
>>> random.seed(444)
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> done, pending = asyncio.run(main(n, it=it, timeout=timeout))
>>> for i in pending:
...     i.cancel()
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds
>>> done
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>}
>>> pprint(done)
{<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>}
>>> pprint(pending)
{<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>}
如上所述,问题在于我似乎无法将task实例映射回iterable中的输入。它们的任务ID通过tasks = [asyncio.create_task(coro(i)) for i in it]有效地丢失在函数作用域中。是否有Python方式/使用异步API来模仿asyncio.gather()此处的行为?

推荐答案

查看底层_wait()协程,此协程将传递一个任务列表,并将修改这些任务的状态。这意味着,在main()范围内,tasks = [asyncio.create_task(coro(i)) for i in it]中的tasks将通过调用await asyncio.wait(tasks, timeout=timeout)进行修改。与返回(done, pending)元组不同,一种解决方法是只返回tasks本身,这保留了输入it的顺序。wait()/_wait()只是将任务分成已完成/挂起的子集,在这种情况下,我们可以丢弃这些子集,并使用其元素已更改的tasks的整个列表。

在这种情况下有三种可能的任务状态:

  • 任务返回有效结果(coro()),未引发异常,并且在timeout下完成。它的.cancelled()将为false,并且它有一个有效的.result(),该.result()不是异常实例
  • 任务在有机会返回结果或引发异常之前超时。它将显示.cancelled(),其.exception()将引发CancelledError
  • 允许时间完成并从coro()引发异常的任务;它将显示.cancelled()为False,其exception()将引发

(所有这些都在asyncio/futures.py中列出。)


插图:

>>> # imports/other code snippets - see question
>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     await asyncio.wait(tasks, timeout=timeout)
...     return tasks  # *not* (done, pending)

>>> timeout = 0.5
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> tasks = asyncio.run(main(n, it=it, timeout=timeout))
>>> elapsed = perf_counter() - start
>>> print(f"Done main({n}) in {elapsed:0.2f} seconds")
Done main(10) in 0.50 seconds

>>> pprint(tasks)
[<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>]

现在应用上面的逻辑,它使res保持与输入相对应的顺序:

>>> res = []
>>> for t in tasks:
...     try:
...         r = t.result()
...     except Exception as e:
...         res.append(e)
...     else:
...         res.append(r)
>>> pprint(res)
[0.3088946587429545,
 0.01323751590501987,
 Exception('i too high'),
 CancelledError(),
 CancelledError(),
 CancelledError(),
 Exception('i too high'),
 0.3113884366691503,
 0.07422124156714727,
 CancelledError()]
>>> dict(zip(it, res))
{0.3088946587429545: 0.3088946587429545,
 0.01323751590501987: 0.01323751590501987,
 0.4844375347808497: Exception('i too high'),
 0.8614799253779063: concurrent.futures._base.CancelledError(),
 0.7426396870165088: concurrent.futures._base.CancelledError(),
 0.6338658027451068: concurrent.futures._base.CancelledError(),
 0.4419557492849159: Exception('i too high'),
 0.3113884366691503: 0.3113884366691503,
 0.07422124156714727: 0.07422124156714727,
 0.5796792804615848: concurrent.futures._base.CancelledError()}

这篇关于包装异步。在超时时聚集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆