如何用迭代器包装asyncio [英] How to wrap asyncio with iterator
问题描述
我有以下简化代码:
async def asynchronous_function(*args, **kwds):
statement = await prepare(query)
async with conn.transaction():
async for record in statement.cursor():
??? yield record ???
...
class Foo:
def __iter__(self):
records = ??? asynchronous_function ???
yield from records
...
x = Foo()
for record in x:
...
我不知道如何填写上面的???
.我想产生记录数据,但是如何包装异步代码真的不是很明显.
I don't know how to fill in the ???
above. I want to yield the record data, but it's really not obvious how to wrap asyncio code.
推荐答案
虽然确实打算将asyncio全面使用,但有时根本无法立即转换大型软件(包括其所有依赖项) )进行异步处理.幸运的是,有一些方法可以将传统同步代码与新编写的异步部分结合在一起.一种简单的方法是在专用线程中运行事件循环,并使用
While it is true that asyncio is intended to be used across the board, sometimes it is simply impossible to immediately convert a large piece of software (with all its dependencies) to async. Fortunately there are ways to combine legacy synchronous code with newly written asyncio portions. A straightforward way to do so is by running the event loop in a dedicated thread, and using asyncio.run_coroutine_threadsafe
to submit tasks to it.
使用这些低级工具,您可以编写一个通用适配器,将任何异步迭代器转换为同步迭代器.例如:
With those low-level tools you can write a generic adapter to turn any asynchronous iterator into a synchronous one. For example:
import asyncio, threading, queue
# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()
def wrap_async_iter(ait):
"""Wrap an asynchronous iterator into a synchronous one"""
q = queue.Queue()
_END = object()
def yield_queue_items():
while True:
next_item = q.get()
if next_item is _END:
break
yield next_item
# After observing _END we know the aiter_to_queue coroutine has
# completed. Invoke result() for side effect - if an exception
# was raised by the async iterator, it will be propagated here.
async_result.result()
async def aiter_to_queue():
try:
async for item in ait:
q.put(item)
finally:
q.put(_END)
async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
return yield_queue_items()
然后您的代码只需要调用wrap_async_iter
即可将一个异步迭代包装到一个同步对象中:
Then your code just needs to call wrap_async_iter
to wrap an async iter into a sync one:
async def mock_records():
for i in range(3):
yield i
await asyncio.sleep(1)
for record in wrap_async_iter(mock_records()):
print(record)
在您的情况下,Foo.__iter__
将使用yield from wrap_async_iter(asynchronous_function(...))
.
In your case Foo.__iter__
would use yield from wrap_async_iter(asynchronous_function(...))
.
这篇关于如何用迭代器包装asyncio的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!