将python-trio中的信号量和时间限制与asks HTTP请求相结合 [英] Combining semaphore and time limiting in python-trio with asks http request

查看:107
本文介绍了将python-trio中的信号量和时间限制与asks HTTP请求相结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以异步方式使用Python,以便加快对服务器的请求.服务器的响应时间较慢(通常为几秒钟,但有时甚至快于一秒钟),但并行运行良好.我无权访问此服务器,也无法更改任何内容.因此,我有一个很大的URL列表(在下面的代码中,在 pages 中),我想知道这些URL,并希望通过在以下位置发出 NO_TASKS = 5 请求来加快其加载速度.一个时间.另一方面,我不想使服务器超载,因此我希望每个请求之间的最小暂停时间为1秒(即每秒1个请求的限制).

I'm trying to use Python in an async manner in order to speed up my requests to a server. The server has a slow response time (often several seconds, but also sometimes faster than a second), but works well in parallel. I have no access to this server and can't change anything about it. So, I have a big list of URLs (in the code below, pages) which I know beforehand, and want to speed up their loading by making NO_TASKS=5 requests at a time. On the other hand, I don't want to overload the server, so I want a minimum pause between every request of 1 second (i. e. a limit of 1 request per second).

到目前为止,我已经使用Trio队列成功实现了信号量部分(一次五个请求).

So far I have successfully implemented the semaphore part (five requests at a time) using a Trio queue.

import asks
import time
import trio

NO_TASKS = 5


asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []


pages = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
]


async def async_load_page(url):
    global next_request_at
    sleep = next_request_at
    next_request_at = max(trio.current_time() + 1, next_request_at)
    await trio.sleep_until(sleep)
    next_request_at = max(trio.current_time() + 1, next_request_at)
    print('start loading page {} at {} seconds'.format(url, trio.current_time()))
    req = await asks_session.get(url)
    results.append(req.text)


async def producer(url):
    await queue.put(url)  


async def consumer():
    while True:
        if queue.empty():
            print('queue empty')
            return
        url = await queue.get()
        await async_load_page(url)


async def main():
    async with trio.open_nursery() as nursery:
        for page in pages:
            nursery.start_soon(producer, page)
        await trio.sleep(0.2)
        for _ in range(NO_TASKS):
            nursery.start_soon(consumer)


start = time.time()
trio.run(main)

但是,我缺少限制部分的实现,即.e.最大实施每秒1个请求.您可以在我的尝试上方看到它( async_load_page 的前五行),但是正如您在执行代码时所看到的那样,这是行不通的:

However, I'm missing the implementation of the limiting part, i. e. the implementation of max. 1 request per second. You can see above my attempt to do so (first five lines of async_load_page), but as you can see when you execute the code, this is not working:

start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty

我花了一些时间寻找答案,但找不到答案.

I've spent some time searching for answers but couldn't find any.

推荐答案

实现目标的一种方法是使用工作人员在发送请求之前获取的互斥锁,并在某个时间间隔后在单独的任务中释放该互斥锁:

One of the ways to achieve your goal would be using a mutex acquired by a worker before sending a request and released in a separate task after some interval:

async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
    # Using binary `trio.Semaphore` to be able
    # to release it from a separate task.
    mutex = trio.Semaphore(1)

    async def tick():
        await trio.sleep(throttle)
        mutex.release()

    async def worker():
        for url in urls:
            await mutex.acquire()
            nursery.start_soon(tick)
            response = await asks.get(url)
            responses.append(response)

    async with trio.open_nursery() as nursery:
        for _ in range(n_workers):
            nursery.start_soon(worker)

如果 worker 获得响应的时间早于 throttle 秒之后,它将在 waitmutx.acquire()上阻塞.否则, tick 将释放 mutex ,而另一个 worker 将能够获取它.

If a worker gets response sooner than after throttle seconds, it will block on await mutex.acquire(). Otherwise the mutex will be released by the tick and another worker will be able to acquire it.

这类似于漏斗算法的工作原理:

This is similar to how leaky bucket algorithm works:

  • 等待 mutex 的工人就像一桶水.
  • 每个 tick 就像一个以恒定速率泄漏的存储桶.
  • Workers waiting for the mutex are like water in a bucket.
  • Each tick is like a bucket leaking at a constant rate.

如果在发送请求之前添加一些日志记录,则应该获得类似于以下的输出:

If you add a bit of logging just before sending a request you should get an output similar to this:

   0.00169 started
  0.001821 n_workers: 5
  0.001833 throttle: 1
  0.002152 fetching https://httpbin.org/delay/4
     1.012 fetching https://httpbin.org/delay/2
     2.014 fetching https://httpbin.org/delay/2
     3.017 fetching https://httpbin.org/delay/3
      4.02 fetching https://httpbin.org/delay/0
     5.022 fetching https://httpbin.org/delay/2
     6.024 fetching https://httpbin.org/delay/2
     7.026 fetching https://httpbin.org/delay/3
     8.029 fetching https://httpbin.org/delay/0
     9.031 fetching https://httpbin.org/delay/0
     10.61 finished

这篇关于将python-trio中的信号量和时间限制与asks HTTP请求相结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆