创建在协程结束时产生协程结果的生成器 [英] Create generator that yields coroutine results as the coroutines finish

查看:61
本文介绍了创建在协程结束时产生协程结果的生成器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当前,我有一个效率低下的同步生成器,该生成器按顺序发出许多HTTP请求并产生结果.我想使用 asyncio PEP 525异步生成器),以便将称它不需要修改.如何创建这样的生成器?

Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio and aiohttp to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?

推荐答案

asyncio.as_completed(), currently barely documented, takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await the members from inside an async function...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

loop = asyncio.get_event_loop()

# Prints 'second', then 'third', then 'first'
loop.run_until_complete(main())

...但是出于这个问题的目的,我们想要的是能够从普通生成器中产生这些结果,以便普通同步代码可以使用它们,而无需知道在以下情况下使用了async函数引擎盖.我们可以通过调用as_completed调用产生的期货来调用loop.run_until_complete()来做到这一点...

... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async functions are being used under the hood. We can do that by calling loop.run_until_complete() on the futures yielded by our as_completed call...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)

这样,我们就将异步代码公开给非异步域,这种方式不需要调用者将任何函数定义为async,甚至不需要知道ordinary_generator正在使用在幕后.

In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async, or to even know that ordinary_generator is using asyncio under the hood.

作为ordinary_generator()的替代实现,它在某些情况下提供了更大的灵活性,我们可以重复调用 asyncio.wait() ,并带有FIRST_COMPLETED标志,而不是循环遍历as_completed():

As an alternative implementation of ordinary_generator() that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait() with the FIRST_COMPLETED flag instead of looping over as_completed():

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()

此方法可以维护pending个作业列表,其优点是我们可以对其进行调整,以将作业动态添加到pending列表中.这在某些情况下很有用,在这种情况下,我们的异步作业会向队列中添加无法预测的其他作业数量,例如网络蜘蛛会跟踪它访问的每个页面上的所有链接.

This approach, maintaining a list of pending jobs, has the advantage that we can adapt it to add jobs to the pending list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.

这篇关于创建在协程结束时产生协程结果的生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆