将python-trio中的信号量和时间限制与asks HTTP请求相结合 [英] Combining semaphore and time limiting in python-trio with asks http request
问题描述
我正在尝试以异步方式使用Python,以便加快对服务器的请求.服务器的响应时间较慢(通常为几秒钟,但有时甚至快于一秒钟),但并行运行良好.我无权访问此服务器,也无法更改任何内容.因此,我有一个很大的URL列表(在下面的代码中,在 pages
中),我想知道这些URL,并希望通过在以下位置发出 NO_TASKS = 5
请求来加快其加载速度.一个时间.另一方面,我不想使服务器超载,因此我希望每个请求之间的最小暂停时间为1秒(即每秒1个请求的限制).
I'm trying to use Python in an async manner in order to speed up my requests to a server. The server has a slow response time (often several seconds, but also sometimes faster than a second), but works well in parallel. I have no access to this server and can't change anything about it. So, I have a big list of URLs (in the code below, pages
) which I know beforehand, and want to speed up their loading by making NO_TASKS=5
requests at a time. On the other hand, I don't want to overload the server, so I want a minimum pause between every request of 1 second (i. e. a limit of 1 request per second).
到目前为止,我已经使用Trio队列成功实现了信号量部分(一次五个请求).
So far I have successfully implemented the semaphore part (five requests at a time) using a Trio queue.
import asks
import time
import trio
NO_TASKS = 5
asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []
pages = [
'https://www.yahoo.com/',
'http://www.cnn.com',
'http://www.python.org',
'http://www.jython.org',
'http://www.pypy.org',
'http://www.perl.org',
'http://www.cisco.com',
'http://www.facebook.com',
'http://www.twitter.com',
'http://www.macrumors.com/',
'http://arstechnica.com/',
'http://www.reuters.com/',
'http://abcnews.go.com/',
'http://www.cnbc.com/',
]
async def async_load_page(url):
global next_request_at
sleep = next_request_at
next_request_at = max(trio.current_time() + 1, next_request_at)
await trio.sleep_until(sleep)
next_request_at = max(trio.current_time() + 1, next_request_at)
print('start loading page {} at {} seconds'.format(url, trio.current_time()))
req = await asks_session.get(url)
results.append(req.text)
async def producer(url):
await queue.put(url)
async def consumer():
while True:
if queue.empty():
print('queue empty')
return
url = await queue.get()
await async_load_page(url)
async def main():
async with trio.open_nursery() as nursery:
for page in pages:
nursery.start_soon(producer, page)
await trio.sleep(0.2)
for _ in range(NO_TASKS):
nursery.start_soon(consumer)
start = time.time()
trio.run(main)
但是,我缺少限制部分的实现,即.e.最大实施每秒1个请求.您可以在我的尝试上方看到它( async_load_page
的前五行),但是正如您在执行代码时所看到的那样,这是行不通的:
However, I'm missing the implementation of the limiting part, i. e. the implementation of max. 1 request per second. You can see above my attempt to do so (first five lines of async_load_page
), but as you can see when you execute the code, this is not working:
start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty
我花了一些时间寻找答案,但找不到答案.
I've spent some time searching for answers but couldn't find any.
推荐答案
实现目标的一种方法是使用工作人员在发送请求之前获取的互斥锁,并在某个时间间隔后在单独的任务中释放该互斥锁:>
One of the ways to achieve your goal would be using a mutex acquired by a worker before sending a request and released in a separate task after some interval:
async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
# Using binary `trio.Semaphore` to be able
# to release it from a separate task.
mutex = trio.Semaphore(1)
async def tick():
await trio.sleep(throttle)
mutex.release()
async def worker():
for url in urls:
await mutex.acquire()
nursery.start_soon(tick)
response = await asks.get(url)
responses.append(response)
async with trio.open_nursery() as nursery:
for _ in range(n_workers):
nursery.start_soon(worker)
如果 worker
获得响应的时间早于 throttle
秒之后,它将在 waitmutx.acquire()
上阻塞.否则, tick
将释放 mutex
,而另一个 worker
将能够获取它.
If a worker
gets response sooner than after throttle
seconds, it will block on await mutex.acquire()
. Otherwise the mutex
will be released by the tick
and another worker
will be able to acquire it.
这类似于漏斗算法的工作原理:
This is similar to how leaky bucket algorithm works:
- 等待
mutex
的工人就像一桶水. - 每个
tick
就像一个以恒定速率泄漏的存储桶.
- Workers waiting for the
mutex
are like water in a bucket. - Each
tick
is like a bucket leaking at a constant rate.
如果在发送请求之前添加一些日志记录,则应该获得类似于以下的输出:
If you add a bit of logging just before sending a request you should get an output similar to this:
0.00169 started
0.001821 n_workers: 5
0.001833 throttle: 1
0.002152 fetching https://httpbin.org/delay/4
1.012 fetching https://httpbin.org/delay/2
2.014 fetching https://httpbin.org/delay/2
3.017 fetching https://httpbin.org/delay/3
4.02 fetching https://httpbin.org/delay/0
5.022 fetching https://httpbin.org/delay/2
6.024 fetching https://httpbin.org/delay/2
7.026 fetching https://httpbin.org/delay/3
8.029 fetching https://httpbin.org/delay/0
9.031 fetching https://httpbin.org/delay/0
10.61 finished
这篇关于将python-trio中的信号量和时间限制与asks HTTP请求相结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!