如何在Python中创建异步生成器? [英] How to create an async generator in Python?

查看:515
本文介绍了如何在Python中创建异步生成器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将此Python2.7代码重写为新的异步世界秩序:

I'm trying to rewrite this Python2.7 code to the new async world order:

def get_api_results(func, iterable):
    pool = multiprocessing.Pool(5)
    for res in pool.map(func, iterable):
        yield res

map()阻塞,直到所有结果都已计算完毕,所以我试图将其重写为一个异步实现,以在准备好结果后立即产生结果.与map()一样,返回值必须以与iterable相同的顺序返回.我尝试了此操作(由于旧式身份验证要求,我需要requests):

map() blocks until all results have been computed, so I'm trying to rewrite this as an async implementation that will yield results as soon as they are ready. Like map(), return values must be returned in the same order as iterable. I tried this (I need requests because of legacy auth requirements):

import requests

def get(i):
    r = requests.get('https://example.com/api/items/%s' % i)
    return i, r.json()

async def get_api_results():
    loop = asyncio.get_event_loop()
    futures = []
    for n in range(1, 11):
        futures.append(loop.run_in_executor(None, get, n))
    async for f in futures:
        k, v = await f
        yield k, v

for r in get_api_results():
    print(r)

但是使用Python 3.6,我会得到:

but with Python 3.6 I'm getting:

  File "scratch.py", line 16, in <module>
    for r in get_api_results():
TypeError: 'async_generator' object is not iterable

我该怎么做?

推荐答案

关于您的较早版本(2.7)的代码-多处理被认为是功能强大的直接替代品,可用于同时处理CPU密集型任务的简单得多的线程模块,在这种情况下,线程确实可以不能很好地工作.您的代码可能不受CPU限制-因为它只需要发出HTTP请求-并且线程可能足以解决您的问题.

Regarding your older (2.7) code - multiprocessing is considered a powerful drop-in replacement for the much simpler threading module for concurrently processing CPU intensive tasks, where threading does not work so well. Your code is probably not CPU bound - since it just needs to make HTTP requests - and threading might have been enough for solving your problem.

但是,Python 3+没有直接使用threading,而是具有一个名为外部软件包用于python 2.7.

However, instead of using threading directly, Python 3+ has a nice module called concurrent.futures that with a cleaner API via cool Executor classes. This module is available also for python 2.7 as an external package.

以下代码适用于python 2和python 3:

The following code works on python 2 and python 3:

# For python 2, first run:
#
#    pip install futures
#
from __future__ import print_function

import requests
from concurrent import futures

URLS = [
    'http://httpbin.org/delay/1',
    'http://httpbin.org/delay/3',
    'http://httpbin.org/delay/6',
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.coooom/',
]


def fetch(url):
    r = requests.get(url)
    r.raise_for_status()
    return r.content


def fetch_all(urls):
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(fetch, url): url for url in urls}
        print("All URLs submitted.")
        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            if future.exception() is None:
                yield url, future.result()
            else:
                # print('%r generated an exception: %s' % (
                # url, future.exception()))
                yield url, None


for url, s in fetch_all(URLS):
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
    print('{}: {}'.format(url, status))

此代码基于线程使用futures.ThreadPoolExecutor. as_completed()这里使用了很多魔术.

This code uses futures.ThreadPoolExecutor, based on threading. A lot of the magic is in as_completed() used here.

您上面的python 3.6代码使用 run_in_executor() 会创建一个futures.ProcessPoolExecutor(),并且实际上并没有使用异步IO !!

Your python 3.6 code above, uses run_in_executor() which creates a futures.ProcessPoolExecutor(), and does not really use asynchronous IO!!

如果您真的想使用asyncio,则需要使用支持asyncio的HTTP客户端,例如

If you really want to go forward with asyncio, you will need to use an HTTP client that supports asyncio, such as aiohttp. Here is an example code:

import asyncio

import aiohttp


async def fetch(session, url):
    print("Getting {}...".format(url))
    async with session.get(url) as resp:
        text = await resp.text()
    return "{}: Got {} bytes".format(url, len(text))


async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
                 for delay in (1, 1, 2, 3, 3)]
        for task in asyncio.as_completed(tasks):
            print(await task)
    return "Done."


loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()

如您所见,asyncio还有一个as_completed(),现在使用真正的异步IO,在一个进程中仅使用一个线程.

As you can see, asyncio also has an as_completed(), now using real asynchronous IO, utilizing only one thread on one process.

这篇关于如何在Python中创建异步生成器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆