Python aiohttp / asyncio-如何处理返回的数据 [英] Python aiohttp/asyncio - how to process returned data

查看:392
本文介绍了Python aiohttp / asyncio-如何处理返回的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用aiohttp将一些同步代码移至asyncio的过程中。同步代码要花15分钟才能运行,所以我希望对此加以改进。

Im in the process of moving some synchronous code to asyncio using aiohttp. the synchronous code was taking 15 minutes to run, so I'm hoping to improves this.

我有一些工作代码可以从某些url获取数据并返回主体每。但这只是针对1个实验室站点,我有70多个实际站点。

I have some working code which gets data from some urls and returns the body of each. But this is just against 1 lab site, I have 70+ actual sites.

因此,如果我有一个循环来创建所有站点的所有URL的列表,在要处理的列表中添加700个网址。现在处理它们,我认为没有问题吗?

So if I got a loop to create a list of all the urls for all sites that would make 700 urls in a list to be processed. Now processing them I don't think is a problem?

但是对结果做填充时,我不确定如何编程吗?我已经有代码可以对返回的每个结果执行填充操作,但是我不确定如何针对正确的结果类型进行编程。

But doing 'stuff' with the results, I'm not sure how to program? I have code already that will do 'stuff' to each of the results that are returned, but I'm not sure how to program against the right type of result.

代码运行时会处理所有URL,并根据运行时间返回未知顺序吗?

When the code runs does it process all urls and depending on the time to run, return an unknown order?

我需要一个可以处理任何类型结果的函数吗?

Do I need a function that will process any type of result?

import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup

def page_content(page):
    return BeautifulSoup(page, 'html.parser')


async def fetch(session, url):
    with aiohttp.Timeout(15, loop=session.loop):
        async with session.get(url) as response:
            return page_content(await response.text())

async def get_url_data(urls, username, password):
    tasks = []
    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
        for i in urls:
            task = asyncio.ensure_future(fetch(session, i))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have all response bodies in this variable
        for i in responses:
            print(i.title.text)
        return responses


def main():
    username = 'monitoring'
    password = '*********'
    ip = '10.10.10.2'
    urls = [
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
        ]
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_url_data(urls,username,password))
    data = loop.run_until_complete(future)
    print(data)

if __name__ == "__main__":
    main()


推荐答案

以下是 concurrent.futures.ProcessPoolExecutor 。如果创建时未指定 max_workers ,则该实现将使用 os.cpu_count 代替。另请注意, asyncio.wrap_future 是公开的,但没有记录。另外,还有 AbstractEventLoop.run_in_executor

Here's an example with concurrent.futures.ProcessPoolExecutor. If it's created without specifying max_workers, the implementation will use os.cpu_count instead. Also note that asyncio.wrap_future is public but undocumented. Alternatively, there's AbstractEventLoop.run_in_executor.

import asyncio
from concurrent.futures import ProcessPoolExecutor

import aiohttp
import lxml.html


def process_page(html):
    '''Meant for CPU-bound workload'''
    tree = lxml.html.fromstring(html)
    return tree.find('.//title').text


async def fetch_page(url, session):
    '''Meant for IO-bound workload'''
    async with session.get(url, timeout = 15) as res:
      return await res.text()


async def process(url, session, pool):
    html = await fetch_page(url, session)
    return await asyncio.wrap_future(pool.submit(process_page, html))


async def dispatch(urls):
    pool = ProcessPoolExecutor()
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session, pool) for url in urls)
        return await asyncio.gather(*coros)


def main():
    urls = [
      'https://stackoverflow.com/',
      'https://serverfault.com/',
      'https://askubuntu.com/',
      'https://unix.stackexchange.com/'
    ]
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls))
    print(result)

if __name__ == '__main__':
    main()

这篇关于Python aiohttp / asyncio-如何处理返回的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆