asyncio:从执行程序中的异步函数收集结果 [英] asyncio: collecting results from an async function in an executor

查看:143
本文介绍了asyncio:从执行程序中的异步函数收集结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一旦所有HTTP请求返回,我想启动大量HTTP请求并收集其结果.使用 asyncio 可以以非阻塞方式发送请求,但是我在收集它们的结果时遇到问题.

I would like to start a large number of HTTP requests and collect their results, once all of them have returned. Sending the requests in a non-blocking fashion is possible with asyncio, but I have problems collecting their results.

我知道针对该特定问题的解决方案,例如 aiohttp .但是HTTP请求只是一个示例,我的问题是如何正确使用 asyncio .

I'm aware of solutions such as aiohttp that are made for this specific problem. But the HTTP requests are just an example, my question is how to use asyncio correctly.

在服务器端,我有flask用"Hello World!"回答对 localhost/的每个请求,但是等待0.1秒才回答.在我的所有示例中,我正在发送10个请求.同步代码大约需要1秒钟,异步版本则需要0.1秒.

On the server-side, I have flask which answers every request to localhost/ with "Hello World!", but it waits 0.1 seconds before answering. In all my examples, I'm sending 10 requests. A synchronous code should take about 1 second, an asynchronous version could do it in 0.1 seconds.

在客户端,我想同时启动许多请求并收集其结果.我正在尝试以三种不同的方式来做到这一点.由于asyncio需要执行程序来解决阻塞代码,因此所有方法都调用 loop.run_in_executor .

On the client-side I want to spin up many requests at the same time and collect their results. I'm trying to do this in three different ways. Since asyncio needs an executor to work around blocking code, all of the approaches call loop.run_in_executor.

此代码在他们之间共享:

This code is shared between them:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

方法1:

在任务列表上使用 asyncio.gather(),然后 run_until_complete .阅读 Asyncio.gather与asyncio.wait 后,似乎聚集会在结果.但事实并非如此.因此,此代码立即返回,而无需等待请求完成.如果我在这里使用阻止功能,则可以正常工作.为什么我不能使用异步功能?

Use asyncio.gather() on a list of tasks and then run_until_complete. After reading Asyncio.gather vs asyncio.wait, it seemed like gather would wait on the results. But it doesn't. So this code returns instantly, without waiting for the requests to finish. If I use a blocking function here, this works. Why can't I use an async function ?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
    tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python甚至警告我,从未等待过协程"request_async" .在这一点上,我有一个可行的解决方案:在执行程序中使用常规(而非异步)功能.但是我想有一个可以与 async 函数定义一起使用的解决方案.因为我想在其中使用 await (在这个简单的示例中,这不是必需的,但是如果我将更多代码移至 asyncio ,我相信它将变得很重要).

Python even warns me that coroutine "request_async" was never awaited. At this point, I have a working solution: Using a normal (not async) function in an executor. But I would like to have a solution that works with async function definitions. Because I would like to use await inside them (in this simple example that is not necessary, but if I move more code to asyncio, I'm sure it will become important).

方法2:

Python警告我,从未等待过我的协程.所以,让我们等待他们.方法2将所有代码包装到外部异步函数中,并等待收集的结果.同样的问题,也立即返回(也同样警告):

Python warns me that my coroutines are never awaited. So let's await them. Approach 2 wraps all the code into an outer async function and awaits the result from the gathering. Same problem, also returns instantly (also same warning):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

这真让我感到困惑.我正在等待 gather 的结果.直观地讲,应该传播到我正在收集的协程中.但是python仍然抱怨从未等待我的协程.

This really confused me. I'm waiting on the result of gather. Intuitively that should be propagated to the coroutines that I'm gathering. But python still complains that my coroutine is never awaited.

我阅读了更多内容,发现:如何在asyncio中使用请求?

I read some more and found: How could I use requests in asyncio?

这几乎完全是我的示例:结合 requests asyncio .这使我进入方法3:

This is pretty much exactly my example: Combining requests and asyncio. Which brings me to approach 3:

方法3:

与方法2相同的结构,但分别等待分别赋予 run_in_executor()的每个任务(请确保将其视为等待协程):

Same structure as approach 2, but wait on each task that was given to run_in_executor() individually (surely this counts as awaiting the coroutine):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

我的问题是:我想在协程中包含阻塞代码,并与执行程序并行运行它们.我如何得到他们的结果?

My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?

推荐答案

我的问题是:我想在协程中包含阻塞代码,并与执行程序并行运行它们.我如何得到他们的结果?

My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?

答案是,您不应该在协程中使用阻塞代码.如果必须使用它,则必须使用 run_in_executor 将其隔离.因此,使用 requests 编写 request_async 的正确方法是:

The answer is that you're not supposed to have blocking code in your coroutines. If you must have it, you have to isolate it using run_in_executor. So the correct way to write request_async using requests is:

async def request_async():
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, request_sync)

request_async 传递给 run_in_executor 是没有道理的,因为 run_in_executor 的整个 point 是要调用一个 sync 函数在另一个线程中.如果您给它提供协程函数,它将愉快地调用它(在另一个线程中),并将返回的协程对象提供为结果".就像将生成器传递给需要普通函数的代码一样-是的,它将很好地调用生成器,但它不知道如何处理返回的对象.

Passing request_async to run_in_executor doesn't make sense because the entire point of run_in_executor is to invoke a sync function in a different thread. If you give it a coroutine function, it will happily call it (in another thread) and provide the returned coroutine object as "result". That's like passing a generator to a code that expects an ordinary function - yes, it will call the generator just fine, but it will not know what to do with the returned object.

通常,您不能仅将 async 放在 def 前面,并期望获得可用的协同程序.协程一定不能阻塞,除非等待其他异步代码.

In general, you cannot just put async in front of the def and expect to get a usable coroutine. A coroutine must not block, except by awaiting other asynchronous code.

一旦您有可用的 request_async ,您可以按以下方式收集其结果:

Once you have a usable request_async, you can collect its results like this:

async def main():
    coros = [request_async() for _i in range(10)]
    results = await asyncio.gather(*coros)
    return results

results = loop.run_until_complete(main())

这篇关于asyncio:从执行程序中的异步函数收集结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆