连接如何在多进程池中回收来自 python 中单个 requests.Session 对象的请求? [英] How do connections recycle in a multiprocess pool serving requests from a single requests.Session object in python?

查看:156
本文介绍了连接如何在多进程池中回收来自 python 中单个 requests.Session 对象的请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是为问题简化的完整代码.

Below is the complete code simplified for the question.

ids_to_check 返回一个 id 列表.在我的测试中,我使用了一个包含 13 个随机字符串的列表.

ids_to_check returns a list of ids. For my testing, I used a list of 13 random strings.

#!/usr/bin/env python3
import time
from multiprocessing.dummy import Pool as ThreadPool, current_process as threadpool_process
import requests

def ids_to_check():
     some_calls()
     return(id_list)

def execute_task(id):
     url = f"https://myserver.com/todos/{ id }"
     json_op = s.get(url,verify=False).json()
     value = json_op['id']
     print(str(value) + '-' + str(threadpool_process()) + str(id(s)))

def main():
    pool = ThreadPool(processes=20)
    while True:
        pool.map(execute_task, ids_to_check())
        print("Let's wait for 10 seconds")
        time.sleep(10)

if __name__ == "__main__":
    s = requests.Session()
    s.headers.update = {
      'Accept': 'application/json'
    }

    main()

输出:

4-<DummyProcess(Thread-2, started daemon 140209222559488)>140209446508360
5-<DummyProcess(Thread-5, started daemon 140209123481344)>140209446508360
7-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
2-<DummyProcess(Thread-11, started daemon 140208527894272)>140209446508360
None-<DummyProcess(Thread-1, started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
12-<DummyProcess(Thread-7, started daemon 140209106695936)>140209446508360
8-<DummyProcess(Thread-3, started daemon 140209140266752)>140209446508360
6-<DummyProcess(Thread-12, started daemon 140208519501568)>140209446508360
3-<DummyProcess(Thread-13, started daemon 140208511108864)>140209446508360
11-<DummyProcess(Thread-10, started daemon 140208536286976)>140209446508360
9-<DummyProcess(Thread-9, started daemon 140209089910528)>140209446508360
1-<DummyProcess(Thread-8, started daemon 140209098303232)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-14, started daemon 140208502716160)>140209446508360
3-<DummyProcess(Thread-20, started daemon 140208108455680)>140209446508360
1-<DummyProcess(Thread-19, started daemon 140208116848384)>140209446508360
7-<DummyProcess(Thread-17, started daemon 140208133633792)>140209446508360
6-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
4-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
9-<DummyProcess(Thread-16, started daemon 140208485930752)>140209446508360
5-<DummyProcess(Thread-15, started daemon 140208494323456)>140209446508360
2-<DummyProcess(Thread-2, started daemon 140209222559488)>140209446508360
8-<DummyProcess(Thread-18, started daemon 140208125241088)>140209446508360
11-<DummyProcess(Thread-1, started daemon 140209230952192)>140209446508360
10-<DummyProcess(Thread-11, started daemon 140208527894272)>140209446508360
12-<DummyProcess(Thread-5, started daemon 140209123481344)>140209446508360
Let's wait for 10 seconds
None-<DummyProcess(Thread-3, started daemon 140209140266752)>140209446508360
2-<DummyProcess(Thread-10, started daemon 140208536286976)>140209446508360
1-<DummyProcess(Thread-12, started daemon 140208519501568)>140209446508360
4-<DummyProcess(Thread-9, started daemon 140209089910528)>140209446508360
5-<DummyProcess(Thread-14, started daemon 140208502716160)>140209446508360
9-<DummyProcess(Thread-6, started daemon 140209115088640)>140209446508360
8-<DummyProcess(Thread-16, started daemon 140208485930752)>140209446508360
7-<DummyProcess(Thread-4, started daemon 140209131874048)>140209446508360
3-<DummyProcess(Thread-20, started daemon 140208108455680)>140209446508360
6-<DummyProcess(Thread-8, started daemon 140209098303232)>140209446508360
12-<DummyProcess(Thread-13, started daemon 140208511108864)>140209446508360
10-<DummyProcess(Thread-7, started daemon 140209106695936)>140209446508360
11-<DummyProcess(Thread-19, started daemon 140208116848384)>140209446508360
Let's wait for 10 seconds
.
.

我的观察:

  • 创建了多个连接(即每个进程的连接),但会话对象在整个代码执行过程中是相同的(因为会话对象 ID 是相同的)
  • 从 ss 输出可以看出,连接保持回收.我无法确定回收的任何特定模式/超时
  • 如果我将进程减少到较小的数量,则连接不会回收.(示例:5)

我不明白连接是如何/为什么被回收的,如果我减少进程数,为什么它们不会被回收.我尝试禁用垃圾收集器 import gc;gc.disable() 并且仍然连接被回收.

I do not understand how/why the connections are being recycled and why they are not if I reduce the process count. I have tried disabling the garbage collector import gc; gc.disable() and still connections are recycled.

我希望创建的连接保持活动状态,直到达到最大请求数.我认为它可以在没有会话并使用保持活动连接标头的情况下工作.

I would like the created connections to keep alive, until it reaches a maximum number of requests. I think it would work without sessions and using keep-alive connection header.

但我很想知道是什么导致这些会话连接在进程池长度很长时保持回收.

But I am curious to know what causing these sessions connections to keep recycling when a process pool length is high.

我可以在任何服务器上重现这个问题,所以它可能不依赖于服务器.

I can reproduce this issue with any server, so it may not be dependent on server.

推荐答案

我通过为每个进程创建会话和并行化请求执行为自己解决了同样的问题.第一次我也使用了 multiprocessing.dummy,但我遇到了和你一样的问题,把它改成了 concurrent.futures.thread.ThreadPoolExecutor.

I solved the same issue for myself by creating session for each process and parallelized requests executions. And at first time I used multiprocessing.dummy too, but I faced the same issue as yours and changed it to concurrent.futures.thread.ThreadPoolExecutor.

这是我的解决方案.

from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial

from requests import Session, Response
from requests.adapters import HTTPAdapter

def thread_pool_execute(iterables, method, pool_size=30) -> list:
    """Multiprocess requests, returns list of responses."""
    session = Session()
    session.mount('https://', HTTPAdapter(pool_maxsize=pool_size))  # that's it
    session.mount('http://', HTTPAdapter(pool_maxsize=pool_size))  # that's it    
    worker = partial(method, session)
    with ThreadPoolExecutor(pool_size) as pool:
        results = pool.map(worker, iterables)
    session.close()
    return list(results)

def simple_request(session, url) -> Response:
    return session.get(url)

response_list = thread_pool_execute(list_of_urls, simple_request)

我使用 pool_size=150 用 200k url 测试站点地图,没有任何问题.仅受目标主机配置限制.

I test sitemaps with 200k urls with it with pool_size=150 without any problems. It's restricts only by target host configuration.

这篇关于连接如何在多进程池中回收来自 python 中单个 requests.Session 对象的请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆