如何将 asyncio 与现有的阻塞库一起使用? [英] How to use asyncio with existing blocking library?

查看:30
本文介绍了如何将 asyncio 与现有的阻塞库一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几个阻塞函数 foobar 并且我无法更改它们(一些我无法控制的内部库.与一个或多个网络服务对话).我如何将其用作异步?例如.我不想做以下事情.

I have few blocking functions foo, bar and I can't change those (Some internal library I don't control. Talks to one or more network services). How do I use it as async?. E.g. I wan't to do the following.

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

这将是低效的,因为我可以为第二个输入调用 foo,而我正在等待第一个输入,对于 bar 也是如此.我如何包装它们以便它们可用于 asyncio(即新的 asyncawait 语法)?

This will be inefficient as I can call foo for the second input while I am waiting for the first and same for bar. How do I wrap them such that they are usable with asyncio (i.e new async, await syntax)?

让我们假设函数是可重入的.即,当之前的 foo 已经在处理时,再次调用 foo 是可以的.

Lets assume the functions are re-entrant. i.e it is fine to call foo again when already a previous foo is processing.

更新

使用可重用装饰器扩展答案.例如,点击此处.

Extending answer with reusable decorator. Click here for example.

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner

推荐答案

这里有(排序)两个问题:第一,如何异步运行阻塞代码,第二,如何并发运行异步代码(asyncio 是单-线程,所以 GIL 仍然适用,所以它不是真正并行的,但我离题了).

There are (sort of) two questions here: first, how to run blocking code asynchronously, and second, how to run async code concurrently (asyncio is single-threaded, so the GIL still applies, so it isn't truly parallel, but I digress).

可以使用 asyncio.ensure_future 创建并发任务,如文档所述 这里.

Concurrent tasks can be created using asyncio.ensure_future, as documented here.

要运行同步代码,您需要在执行器中运行阻塞代码.示例:

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')

async def non_blocking(loop, executor):
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))

如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表理解等),用 asyncio.wait 等待它们,然后然后检索结果.示例:

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

这篇关于如何将 asyncio 与现有的阻塞库一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆