使用ThreadPoolExecutor时Django ORM泄漏连接 [英] Django ORM leaks connections when using ThreadPoolExecutor

查看:86
本文介绍了使用ThreadPoolExecutor时Django ORM泄漏连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 ThreadPoolExecutor 来加快数据处理速度.问题在于线程池会创建新的数据库连接,而Django不会关闭它们.我在 settings.py 中确实有CONN_MAX_AGE,我已经尝试调用 django.db.close_old_connections().

这是一个代码示例:

  def运算(job):结果= FooModel.objects.filter(...).aggregate(...)返回BarModel.objects.create(result)def进程(数据集):thread_pool = ThreadPoolExecutor(max_workers = 20)期货= []对于数据集中的工作:期货+ = [thread_pool.submit(计算,工作)]结果= list(r.result()for wait(futures [0]中的r)返回结果对于范围(0,100)中的i:进程(['foo','bar','qux']) 

如果空闲数据库连接是在另一个线程中启动的,那么Django ORM是否能够终止它们?


UPD:有趣的是,Django甚至不知道这些连接:

 >>>从django.db导入连接>>>打印(len(connections.all()))>>>2个 

  mypostgresdb =#从pg_stat_activity中选择count(*);数数-------182(1列) 

并且已经确定所有工作线程都已关闭:

 >>>#工作线程已关闭:>>>导入线程>>>threading.enumerate()[< _MainThread(MainThread,开始于140660203321088)>] 

解决方案

我的猜测是 ThreadPoolExecutor 不是创建数据库连接的对象,但是线程作业是保持连接的对象.我已经不得不解决这个问题.

我最终构建了这个包装器,以确保每当在ThreadPoolExecutor中完成作业时就手动关闭线程.这对于确保连接不会泄漏是很有用的,到目前为止,在使用此代码时,我还没有发现任何泄漏.

来自functools导入包装的

 从current.futures导入ThreadPoolExecutor从django.db导入连接类DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):"当通过commit()或map()将函数传递给ThreadPoolExecutor时,这将包装函数,并确保调用close_django_db_connection()完成后在线程内运行,以便Django不会泄漏数据库连接.由于map()调用了commit(),因此仅需重写commit()."def close_django_db_connection(self):connection.close()def generate_thread_closing_wrapper(self,fn):@wraps(fn)def new_func(* args,** kwargs):尝试:返回fn(* args,** kwargs)最后:self.close_django_db_connection()返回new_funcdef commit(* args,** kwargs):"我从中获取了args过滤/解压缩逻辑https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py所以我可以像在那儿一样正确地获得函数对象."如果len(args)> == 2:自我,fn,* args = argsfn = self.generate_thread_closing_wrapper(fn = fn)Elif不是args:引发TypeError("ThreadPoolExecutor"对象的描述符提交""需要一个论点")在kwargs中使用elif'fn':fn = self.generate_thread_closing_wrapper(fn = kwargs.pop('fn'))自我,* args = args返回super(self .__ class__,self).submit(fn,* args,** kwargs) 

然后您可以使用此功能:

使用DjangoConnectionThreadPoolExecutor(max_workers = 15)作为执行程序的

 :结果=列表(executor.map(func,args_list)) 

...并确信连接将关闭.

I'm using ThreadPoolExecutor to speed up data processing. The problem is that the thread pool creates new database connections and Django doesn't close them. I do have CONN_MAX_AGE in settings.py and I already tried to call django.db.close_old_connections().

Here is a code example:

def compute(job):
    result = FooModel.objects.filter(...).aggregate(...)
    return BarModel.objects.create(result)

def process(dataset):
    thread_pool = ThreadPoolExecutor(max_workers=20)
    futures = []

    for job in dataset:
        futures += [thread_pool.submit(compute, job)]

    results = list(r.result() for r in wait(futures)[0])
    return results

for i in range(0, 100):
    process(['foo', 'bar', 'qux'])

Is Django ORM able to terminate idle DB connections if they were started in another thread?


UPD: Interestingly, Django doesn't even know about these connections:

>>> from django.db import connections
>>> print(len(connections.all()))
>>> 2

mypostgresdb=# select count(*) from pg_stat_activity;
 count 
-------
   182
(1 row)

And all workers threads were already closed for sure:

>>> #  workers threads were closed:
>>> import threading
>>> threading.enumerate()
[<_MainThread(MainThread, started 140660203321088)>]

解决方案

My guess is that the ThreadPoolExecutor is not what is creating the DB connection, but the threaded jobs are the ones holding the connection. I've had to deal with this already.

I ended up building this wrapper, to ensure that threads are closed manually whenever jobs are done in a ThreadPoolExecutor. This should be useful in ensuring connections are not leaked, so far I haven't seen any leaking while using this code.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Then you can just use this:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...and be confident that the connections will close.

这篇关于使用ThreadPoolExecutor时Django ORM泄漏连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆