在python3中合并异步迭代 [英] Merging async iterables in python3
问题描述
在python3中合并异步迭代器有什么好的方法或支持良好的库吗?
Is there a good way, or a well-supported library, for merging async iterators in python3?
所需的行为与在reactivex中合并observables的行为基本相同.
The desired behavior is basically the same as that of merging observables in reactivex.
也就是说,在正常情况下,如果我正在合并两个异步迭代器,我希望生成的异步迭代器按时间顺序产生结果.迭代器之一的错误应该使合并的迭代器脱轨.
That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.
(来源:http://reactivex.io/documentation/operators/merge.html)
这是我最好的尝试,但似乎有一个标准的解决方案:
This is my best attempt, but it seems like something there might be a standard solution to:
async def drain(stream, q, sentinal=None):
try:
async for item in stream:
await q.put(item)
if sentinal:
await q.put(sentinal)
except BaseException as e:
await q.put(e)
async def merge(*streams):
q = asyncio.Queue()
sentinal = namedtuple("QueueClosed", ["truthy"])(True)
futures = {
asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
}
remaining = len(streams)
while remaining > 0:
result = await q.get()
if result is sentinal:
remaining -= 1
continue
if isinstance(result, BaseException):
raise result
yield result
if __name__ == "__main__":
# Example: Should print:
# 1
# 2
# 3
# 4
loop = asyncio.get_event_loop()
async def gen():
yield 1
await asyncio.sleep(1.5)
yield 3
async def gen2():
await asyncio.sleep(1)
yield 2
await asyncio.sleep(1)
yield 4
async def go():
async for x in merge(gen(), gen2()):
print(x)
loop.run_until_complete(go())
推荐答案
您可以使用 aiostream.stream.merge:
from aiostream import stream
async def go():
async for x in stream.merge(gen(), gen2()):
print(x)
这篇关于在python3中合并异步迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!