创建在协程结束时产生协程结果的生成器 [英] Create generator that yields coroutine results as the coroutines finish
问题描述
当前,我有一个效率低下的同步生成器,该生成器按顺序发出许多HTTP请求并产生结果.我想使用 asyncio
和 PEP 525异步生成器),以便将称它不需要修改.如何创建这样的生成器?
Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?
推荐答案
asyncio.as_completed()
, currently barely documented, takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await
the members from inside an async
function...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
loop = asyncio.get_event_loop()
# Prints 'second', then 'third', then 'first'
loop.run_until_complete(main())
...但是出于这个问题的目的,我们想要的是能够从普通生成器中产生这些结果,以便普通同步代码可以使用它们,而无需知道在以下情况下使用了async
函数引擎盖.我们可以通过调用as_completed
调用产生的期货来调用loop.run_until_complete()
来做到这一点...
... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async
functions are being used under the hood. We can do that by calling loop.run_until_complete()
on the futures yielded by our as_completed
call...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
这样,我们就将异步代码公开给非异步域,这种方式不需要调用者将任何函数定义为async
,甚至不需要知道ordinary_generator
正在使用
In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async
, or to even know that ordinary_generator
is using asyncio
under the hood.
作为ordinary_generator()
的替代实现,它在某些情况下提供了更大的灵活性,我们可以重复调用 asyncio.wait()
,并带有FIRST_COMPLETED
标志,而不是循环遍历as_completed()
:
As an alternative implementation of ordinary_generator()
that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait()
with the FIRST_COMPLETED
flag instead of looping over as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
此方法可以维护pending
个作业列表,其优点是我们可以对其进行调整,以将作业动态添加到pending
列表中.这在某些情况下很有用,在这种情况下,我们的异步作业会向队列中添加无法预测的其他作业数量,例如网络蜘蛛会跟踪它访问的每个页面上的所有链接.
This approach, maintaining a list of pending
jobs, has the advantage that we can adapt it to add jobs to the pending
list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.
这篇关于创建在协程结束时产生协程结果的生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!