来自异步生成器的asyncio as_yield [英] asyncio as_yielded from async generators

查看:124
本文介绍了来自异步生成器的asyncio as_yield的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够从许多异步协程中产生收益. Asyncio的as_completed有点类似于我要寻找的内容(即,我希望任何协程都能在任何时候返回给调用者,然后继续执行),但这似乎只允许常规协程具有单收益.

I'm looking to be able to yield from a number of async coroutines. Asyncio's as_completed is kind of close to what I'm looking for (i.e. I want any of the coroutines to be able to yield at any time back to the caller and then continue), but that only seems to allow regular coroutines with a single return.

这是我到目前为止所拥有的:

Here's what I have so far:

import asyncio


async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_


async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return


async def main():
    runs = [test(i) for i in range(3)]

    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

我追求的是runs = [test_gen(i) for i in range(3)]替换runs = [test(i) for i in range(3)],并让for i in asyncio.as_completed(runs)迭代每个产量.

Replacing runs = [test(i) for i in range(3)] with runs = [test_gen(i) for i in range(3)] and for for i in asyncio.as_completed(runs) to iterate on each yield is what I'm after.

是否可以用Python表示,并且可能有第三方为协程流程提供了比标准库更多的选择吗?

Is this possible to express in Python and are there any third party maybe that give you more options then the standard library for coroutine process flow?

谢谢

推荐答案

您可以使用 aiostream.stream.merge :

from aiostream import stream

async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

安全上下文中运行它确保在迭代后正确清理生成器:

Run it in a safe context to make sure the generators are cleaned up properly after the iteration:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

或者使用管道:

from aiostream import stream, pipe

async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

文档中的更多示例.

地址@ nirvana-msu评论

可以通过相应地准备源来识别产生给定值的生成器:

It is possible to identify the generator that yielded a given value by preparing sources accordingly:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')

这篇关于来自异步生成器的asyncio as_yield的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆