异步发电机 [英] Synchronous generator in asyncio
问题描述
我有以下情况:
- 我有一个阻塞的同步发电机
- 我有一个非阻塞的异步功能
我想在事件循环上运行阻塞生成器(在ThreadPool
中执行)和async
函数.我该如何实现?
I would like to run blocking generator (executed in a ThreadPool
) and the async
function on the event loop. How do I achieve this?
以下函数只是打印生成器的输出,而不是sleep
函数的输出.
The following function simply prints the output from the generator, not from sleep
function.
谢谢!
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import asyncio
import time
def f():
while True:
r = np.random.randint(0, 3)
time.sleep(r)
yield r
async def gen():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
gen = await loop.run_in_executor(executor, f)
for item in gen:
print(item)
print('Inside generator')
async def sleep():
while True:
await asyncio.sleep(1)
print('Inside async sleep')
async def combine():
await asyncio.gather(sleep(), gen())
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(combine())
if __name__ == '__main__':
main()
推荐答案
run_in_executor
在生成器上不起作用,因为它是为阻止函数而设计的.尽管生成器是有效的函数,但生成器在被调用时会立即返回,从而提供调用方应该通过重复调用next
耗尽的对象. (这是Python的for
循环在后台执行的操作.)要使用异步代码中的阻塞生成器,您有两种选择:
run_in_executor
doesn't work on generators because it is designed for blocking functions. While a generator is a valid function, it returns immediately when called, providing an object that the caller is supposed to exhaust through repeated invocations of next
. (This is what Python's for
loop does under the hood.) To use a blocking generator from async code, you have two choices:
- 将迭代的每个步骤(每个对
next
的单独调用)包装到对run_in_executor
的单独调用中,或者 - 在单独的线程中启动
for
循环,并使用队列将对象传输到异步使用者.
- wrap each step of the iteration (each individual call to
next
) in a separate call torun_in_executor
, or - start a
for
loop in a separate thread and use a queue to transfer the objects to an async consumer.
这两种方法都可以抽象为一个接受迭代器并返回等效异步迭代器的函数.这是第二种方法的实现:
Either approach can be abstracted into a function that accepts an iterator and returns an equivalent async iterator. This is an implementation of the second approach:
import asyncio, threading
def async_wrap_iter(it):
"""Wrap blocking iterator into an asynchronous one"""
loop = asyncio.get_event_loop()
q = asyncio.Queue(1)
exception = None
_END = object()
async def yield_queue_items():
while True:
next_item = await q.get()
if next_item is _END:
break
yield next_item
if exception is not None:
# the iterator has raised, propagate the exception
raise exception
def iter_to_queue():
nonlocal exception
try:
for item in it:
# This runs outside the event loop thread, so we
# must use thread-safe API to talk to the queue.
asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
except Exception as e:
exception = e
finally:
asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()
threading.Thread(target=iter_to_queue).start()
return yield_queue_items()
可以使用使用time.time()
进行阻止的普通同步迭代器和异步心跳函数来证明事件循环正在运行来对其进行测试:
It can be tested with a trivial sync iterator that uses time.time()
to block and an async heartbeat function to prove that the event loop is running:
# async_wrap_iter definition as above
import time
def test_iter():
for i in range(5):
yield i
time.sleep(1)
async def test():
ait = async_wrap_iter(test_iter())
async for i in ait:
print(i)
async def heartbeat():
while True:
print('alive')
await asyncio.sleep(.1)
async def main():
asyncio.create_task(heartbeat())
await test()
asyncio.run(main())
这篇关于异步发电机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!