异步发电机 [英] Synchronous generator in asyncio

查看:75
本文介绍了异步发电机的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下情况:

  1. 我有一个阻塞的同步发电机
  2. 我有一个非阻塞的异步功能

我想在事件循环上运行阻塞生成器(在ThreadPool中执行)和async函数.我该如何实现?

I would like to run blocking generator (executed in a ThreadPool) and the async function on the event loop. How do I achieve this?

以下函数只是打印生成器的输出,而不是sleep函数的输出.

The following function simply prints the output from the generator, not from sleep function.

谢谢!

from concurrent.futures import ThreadPoolExecutor

import numpy as np
import asyncio
import time


def f():
    while True:
        r = np.random.randint(0, 3)
        time.sleep(r)
        yield r


async def gen():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    gen = await loop.run_in_executor(executor, f)
    for item in gen:
        print(item)
        print('Inside generator')


async def sleep():
    while True:
        await asyncio.sleep(1)
        print('Inside async sleep')


async def combine():
    await asyncio.gather(sleep(), gen())


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())


if __name__ == '__main__':
    main()

推荐答案

run_in_executor在生成器上不起作用,因为它是为阻止函数而设计的.尽管生成器是有效的函数,但生成器在被调用时会立即返回,从而提供调用方应该通过重复调用next耗尽的对象. (这是Python的for循环在后台执行的操作.)要使用异步代码中的阻塞生成器,您有两种选择:

run_in_executor doesn't work on generators because it is designed for blocking functions. While a generator is a valid function, it returns immediately when called, providing an object that the caller is supposed to exhaust through repeated invocations of next. (This is what Python's for loop does under the hood.) To use a blocking generator from async code, you have two choices:

  • 将迭代的每个步骤(每个对next的单独调用)包装到对run_in_executor的单独调用中,或者
  • 在单独的线程中启动for循环,并使用队列将对象传输到异步使用者.
  • wrap each step of the iteration (each individual call to next) in a separate call to run_in_executor, or
  • start a for loop in a separate thread and use a queue to transfer the objects to an async consumer.

这两种方法都可以抽象为一个接受迭代器并返回等效异步迭代器的函数.这是第二种方法的实现:

Either approach can be abstracted into a function that accepts an iterator and returns an equivalent async iterator. This is an implementation of the second approach:

import asyncio, threading

def async_wrap_iter(it):
    """Wrap blocking iterator into an asynchronous one"""
    loop = asyncio.get_event_loop()
    q = asyncio.Queue(1)
    exception = None
    _END = object()

    async def yield_queue_items():
        while True:
            next_item = await q.get()
            if next_item is _END:
                break
            yield next_item
        if exception is not None:
            # the iterator has raised, propagate the exception
            raise exception

    def iter_to_queue():
        nonlocal exception
        try:
            for item in it:
                # This runs outside the event loop thread, so we
                # must use thread-safe API to talk to the queue.
                asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
        except Exception as e:
            exception = e
        finally:
            asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()

    threading.Thread(target=iter_to_queue).start()
    return yield_queue_items()

可以使用使用time.time()进行阻止的普通同步迭代器和异步心跳函数来证明事件循环正在运行来对其进行测试:

It can be tested with a trivial sync iterator that uses time.time() to block and an async heartbeat function to prove that the event loop is running:

# async_wrap_iter definition as above

import time

def test_iter():
    for i in range(5):
        yield i
        time.sleep(1)

async def test():
    ait = async_wrap_iter(test_iter())
    async for i in ait:
        print(i)

async def heartbeat():
    while True:
        print('alive')
        await asyncio.sleep(.1)

async def main():
    asyncio.create_task(heartbeat())
    await test()

asyncio.run(main())

这篇关于异步发电机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆