异步python itertools链多个生成器 [英] asynchronous python itertools chain multiple generators

查看:134
本文介绍了异步python itertools链多个生成器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新的清晰度:

假设我有2个处理生成器函数:

suppose I have 2 processing generator functions:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

我可以用itertools链接它们

I can chain them with itertools

from itertools import chain

mix = chain(gen1(), gen2())

然后我可以用它创建另一个生成器函数对象,

and then I can create another generator function object with it,

def mix_yield():
   for item in mix:
      yield item

或者只是如果我只想next(mix),它就在那里.

or simply if I just want to next(mix), it's there.

我的问题是,我该如何做异步代码中的等效项目?

因为我需要它:

  • 收益率(一对一),或使用next迭代器
  • 最快解决问题的收益率最高(异步)
  • return in yield (one by one), or with next iterator
  • the fastest resolved yield first (async)

PREV.更新:

经过实验和研究,我发现 aiostream 库,该库声明为异步版本的itertools,所以我做了什么:

After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

但是我仍然不能做next(a_mix)

TypeError: 'merge' object is not an iterator

next(await a_mix)

raise StreamEmpty()

尽管我仍然可以将其列入列表:

Although I still can make it into a list:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

一个目标就完成了,还有一个目标要走:

so one goal is completed, one more to go:

  • 收益率(一对一),或者使用next迭代器

-最快解决的收益率最高(异步)

推荐答案

Python的 next 内置函数只是在对象上调用基础__next__方法的便捷方法. __next__的异步等效项是异步迭代器上的__anext__方法.没有anext全局函数,但是可以很容易地编写它:

Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. There is no anext global function, but one could easily write it:

async def anext(aiterator):
    return await aiterator.__anext__()

但是节省的费用很小,以至于在极少数情况下需要时,也可以直接调用__anext__.异步迭代器又可以通过调用__aiter__(类似于常规可迭代对象提供的__iter__)从异步 iterable 中获得.手动驱动的异步迭代如下所示:

But the savings is so small that, in rare situations when this is needed, one may as well invoke __anext__ directly. The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

当没有更多元素可用时,

__anext__将引发StopAsyncIteration.要遍历异步迭代器,应使用async for.

__anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.

根据您的代码,这是一个可运行的示例,同时使用__anext__async for耗尽用aiostream.stream.combine.merge设置的流:

Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())

这篇关于异步python itertools链多个生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆