在python3中合并异步迭代 [英] Merging async iterables in python3

查看:46
本文介绍了在python3中合并异步迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在python3中合并异步迭代器有什么好的方法或支持良好的库吗?

Is there a good way, or a well-supported library, for merging async iterators in python3?

所需的行为与在reactivex中合并observables的行为基本相同.

The desired behavior is basically the same as that of merging observables in reactivex.

也就是说,在正常情况下,如果我正在合并两个异步迭代器,我希望生成的异步迭代器按时间顺序产生结果.迭代器之一的错误应该使合并的迭代器脱轨.

That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.

(来源:http://reactivex.io/documentation/operators/merge.html)

这是我最好的尝试,但似乎有一个标准的解决方案:

This is my best attempt, but it seems like something there might be a standard solution to:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())

推荐答案

您可以使用 aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

文档 和此 答案.

这篇关于在python3中合并异步迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆