异步python itertools链多个生成器 [英] asynchronous python itertools chain multiple generators
问题描述
更新的清晰度:
假设我有2个处理生成器函数:
suppose I have 2 processing generator functions:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
我可以用itertools链接它们
I can chain them with itertools
from itertools import chain
mix = chain(gen1(), gen2())
然后我可以用它创建另一个生成器函数对象,
and then I can create another generator function object with it,
def mix_yield():
for item in mix:
yield item
或者只是如果我只想next(mix)
,它就在那里.
or simply if I just want to next(mix)
, it's there.
我的问题是,我该如何做异步代码中的等效项目?
因为我需要它:
- 收益率(一对一),或使用
next
迭代器 - 最快解决问题的收益率最高(异步)
- return in yield (one by one), or with
next
iterator - the fastest resolved yield first (async)
PREV.更新:
经过实验和研究,我发现 aiostream 库,该库声明为异步版本的itertools,所以我做了什么:
After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
但是我仍然不能做next(a_mix)
TypeError: 'merge' object is not an iterator
或next(await a_mix)
raise StreamEmpty()
尽管我仍然可以将其列入列表:
Although I still can make it into a list:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
一个目标就完成了,还有一个目标要走:
so one goal is completed, one more to go:
-
收益率(一对一),或者使用
next
迭代器
-最快解决的收益率最高(异步)
推荐答案
Python的 next
内置函数只是在对象上调用基础__next__
方法的便捷方法. __next__
的异步等效项是异步迭代器上的__anext__
方法.没有anext
全局函数,但是可以很容易地编写它:
Python's next
built-in function is just a convenient way of invoking the underlying __next__
method on the object. The async equivalent of __next__
is the __anext__
method on the async iterator. There is no anext
global function, but one could easily write it:
async def anext(aiterator):
return await aiterator.__anext__()
但是节省的费用很小,以至于在极少数情况下需要时,也可以直接调用__anext__
.异步迭代器又可以通过调用__aiter__
(类似于常规可迭代对象提供的__iter__
)从异步 iterable 中获得.手动驱动的异步迭代如下所示:
But the savings is so small that, in rare situations when this is needed, one may as well invoke __anext__
directly. The async iterator is in turn obtained from an async iterable by calling the __aiter__
(in analogy to __iter__
provided by regular iterables). Async iteration driven manually looks like this:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
当没有更多元素可用时,
__anext__
将引发StopAsyncIteration
.要遍历异步迭代器,应使用async for
.
__anext__
will raise StopAsyncIteration
when no more elements are available. To loop over async iterators one should use async for
.
根据您的代码,这是一个可运行的示例,同时使用__anext__
和async for
耗尽用aiostream.stream.combine.merge
设置的流:
Here is a runnable example, based on your code, using both __anext__
and async for
to exhaust the stream set up with aiostream.stream.combine.merge
:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
这篇关于异步python itertools链多个生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!