以每分钟硬性限制并行化对 API 的调用 [英] parallelize calls to an API with hard limit per minute

查看:25
本文介绍了以每分钟硬性限制并行化对 API 的调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对 API 进行并行调用.API 在停止之前每分钟限制为 1,200 次调用.在低于限制的情况下异步执行此操作的最有效方法是什么?

I am trying to make parallelize calls to an API. The API has a limit of 1,200 calls per minute before it stops. What is the most efficient way to async this while being below the limit?

def remove_html_tags(text):
    """Remove html tags from a string"""
    import re
    clean = re.compile('<.*?>')
    return re.sub(clean, ' ', text)

async def getRez(df, url):
async with aiohttp.ClientSession() as session:
        auth = aiohttp.BasicAuth('username',pwd)


        r = await session.get(url, auth=auth)


        if r.status == 200:
            content = await r.text()
            text = remove_html_tags(str(content))

        else:
            text = '500 Server Error'
        df.loc[df['url'] == url, ['RezText']] = [[text]]
        df['wordCount'] = df['RezText'].apply(lambda x: len(str(x).split(" ")))
        data = df[df["RezText"] != "500 Server Error"]


async def main(df):
    df['RezText'] = None
    await asyncio.gather(*[getRez(df, url) for url in df['url']])

loop = asyncio.get_event_loop()
loop.run_until_complete(main(data))

推荐答案

1200 每分钟相当于 20 次调用,因此您可以将请求拆分为 >batches 20 个,并在批次之间休眠一秒钟.

1200 calls per minute equates to 20 calls per second so you can split your requests into batches of 20 and sleep for a second between batches.

另一种选择是将 aiohttp.TCPConnector(limit=20) 用于客户端会话,但这只会限制并发请求的数量,因此您可以结束-执行更多请求(如果 API 响应快于一秒)或更少请求(如果 API 响应慢于一秒);请参阅相关问题.

Another option would be to use aiohttp.TCPConnector(limit=20) for the client session but that only limits the number of concurrent requests, so you could end-up doing more requests (if the API responds faster than one second) or fewer requests (if the API responds slower than one second); see this related question.

批处理示例:

# python 3.7+
import aiohttp
import asyncio

async def fetch(session, url):
    data = None
    async with session.get(url) as response:
        if response.status != 200:
            text = await response.text()
            print("cannot retrieve %s: status: %d, reason: %s" % (url, response.status, text))
        else :
            data = await response.json()
    return data

async def main(n):
    print("starting")
    session = aiohttp.ClientSession()
    tasks = []
    batch = []
    for i in range(n):
        batch.append("http://httpbin.org/anything?key=a%d" % i)
        if len(batch) >= 20:
            print("issuing batch %d:%d" % (i-20+1, i+1))
            for url in batch:
                task = asyncio.create_task(fetch(session, url))
                tasks.append(task)
            batch = []
            await asyncio.sleep(1)
    if batch:  # if batch length does not divide n evenly consume last batch
        print("issuing last batch %d:%d" % (n-len(batch), n))
        for url in batch:
            task = asyncio.create_task(fetch(session, url))
            tasks.append(fetch(session, url))
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    await session.close()
    for response in responses:
        assert "args" in response
        # note that the responses will be in the order in which the requests were made
    print("finished")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(111))

输出

starting
issuing batch 0:20
issuing batch 20:40
issuing batch 40:60
issuing batch 60:80
issuing batch 80:100
issuing last batch 100:111
finished

这里的重要部分是 asyncio.create_task(创建一个任务并启动它,返回一个 Task 对象),await asyncio.sleep(1)(用于节流请求)和 await asyncio.gather(等待所有任务完成运行).
对于 Python <3.7 你可以使用 asyncio.ensure_future 而不是 asyncio.create_task.

The important bits here are asyncio.create_task (creates a task and starts it, returning a Task object), await asyncio.sleep(1) (used to throttle requests) and await asyncio.gather (waits for all tasks to finish running).
For Python < 3.7 you can use asyncio.ensure_future instead of asyncio.create_task.

这篇关于以每分钟硬性限制并行化对 API 的调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆