使用ThreadPoolExecutor时Django ORM泄漏连接 [英] Django ORM leaks connections when using ThreadPoolExecutor
问题描述
我正在使用 ThreadPoolExecutor
来加快数据处理速度.问题在于线程池会创建新的数据库连接,而Django不会关闭它们.我在 settings.py
中确实有CONN_MAX_AGE,我已经尝试调用 django.db.close_old_connections()
.
这是一个代码示例:
def运算(job):结果= FooModel.objects.filter(...).aggregate(...)返回BarModel.objects.create(result)def进程(数据集):thread_pool = ThreadPoolExecutor(max_workers = 20)期货= []对于数据集中的工作:期货+ = [thread_pool.submit(计算,工作)]结果= list(r.result()for wait(futures [0]中的r)返回结果对于范围(0,100)中的i:进程(['foo','bar','qux'])
如果空闲数据库连接是在另一个线程中启动的,那么Django ORM是否能够终止它们?
UPD:有趣的是,Django甚至不知道这些连接:
>>>从django.db导入连接>>>打印(len(connections.all()))>>>2个
mypostgresdb =#从pg_stat_activity中选择count(*);数数-------182(1列)
并且已经确定所有工作线程都已关闭:
>>>#工作线程已关闭:>>>导入线程>>>threading.enumerate()[< _MainThread(MainThread,开始于140660203321088)>]
我的猜测是 ThreadPoolExecutor 不是创建数据库连接的对象,但是线程作业是保持连接的对象.我已经不得不解决这个问题.
我最终构建了这个包装器,以确保每当在ThreadPoolExecutor中完成作业时就手动关闭线程.这对于确保连接不会泄漏是很有用的,到目前为止,在使用此代码时,我还没有发现任何泄漏.
来自functools导入包装的 从current.futures导入ThreadPoolExecutor从django.db导入连接类DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):"当通过commit()或map()将函数传递给ThreadPoolExecutor时,这将包装函数,并确保调用close_django_db_connection()完成后在线程内运行,以便Django不会泄漏数据库连接.由于map()调用了commit(),因此仅需重写commit()."def close_django_db_connection(self):connection.close()def generate_thread_closing_wrapper(self,fn):@wraps(fn)def new_func(* args,** kwargs):尝试:返回fn(* args,** kwargs)最后:self.close_django_db_connection()返回new_funcdef commit(* args,** kwargs):"我从中获取了args过滤/解压缩逻辑https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py所以我可以像在那儿一样正确地获得函数对象."如果len(args)> == 2:自我,fn,* args = argsfn = self.generate_thread_closing_wrapper(fn = fn)Elif不是args:引发TypeError("ThreadPoolExecutor"对象的描述符提交""需要一个论点")在kwargs中使用elif'fn':fn = self.generate_thread_closing_wrapper(fn = kwargs.pop('fn'))自我,* args = args返回super(self .__ class__,self).submit(fn,* args,** kwargs)
然后您可以使用此功能:
使用DjangoConnectionThreadPoolExecutor(max_workers = 15)作为执行程序的 :结果=列表(executor.map(func,args_list))
...并确信连接将关闭.
I'm using ThreadPoolExecutor
to speed up data processing. The problem is that the thread pool creates new database connections and Django doesn't close them. I do have CONN_MAX_AGE in settings.py
and I already tried to call django.db.close_old_connections()
.
Here is a code example:
def compute(job):
result = FooModel.objects.filter(...).aggregate(...)
return BarModel.objects.create(result)
def process(dataset):
thread_pool = ThreadPoolExecutor(max_workers=20)
futures = []
for job in dataset:
futures += [thread_pool.submit(compute, job)]
results = list(r.result() for r in wait(futures)[0])
return results
for i in range(0, 100):
process(['foo', 'bar', 'qux'])
Is Django ORM able to terminate idle DB connections if they were started in another thread?
UPD: Interestingly, Django doesn't even know about these connections:
>>> from django.db import connections
>>> print(len(connections.all()))
>>> 2
mypostgresdb=# select count(*) from pg_stat_activity;
count
-------
182
(1 row)
And all workers threads were already closed for sure:
>>> # workers threads were closed:
>>> import threading
>>> threading.enumerate()
[<_MainThread(MainThread, started 140660203321088)>]
My guess is that the ThreadPoolExecutor is not what is creating the DB connection, but the threaded jobs are the ones holding the connection. I've had to deal with this already.
I ended up building this wrapper, to ensure that threads are closed manually whenever jobs are done in a ThreadPoolExecutor. This should be useful in ensuring connections are not leaked, so far I haven't seen any leaking while using this code.
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection
class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
"""
When a function is passed into the ThreadPoolExecutor via either submit() or map(),
this will wrap the function, and make sure that close_django_db_connection() is called
inside the thread when it's finished so Django doesn't leak DB connections.
Since map() calls submit(), only submit() needs to be overwritten.
"""
def close_django_db_connection(self):
connection.close()
def generate_thread_closing_wrapper(self, fn):
@wraps(fn)
def new_func(*args, **kwargs):
try:
return fn(*args, **kwargs)
finally:
self.close_django_db_connection()
return new_func
def submit(*args, **kwargs):
"""
I took the args filtering/unpacking logic from
https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py
so I can properly get the function object the same way it was done there.
"""
if len(args) >= 2:
self, fn, *args = args
fn = self.generate_thread_closing_wrapper(fn=fn)
elif not args:
raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
"needs an argument")
elif 'fn' in kwargs:
fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
self, *args = args
return super(self.__class__, self).submit(fn, *args, **kwargs)
Then you can just use this:
with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
results = list(executor.map(func, args_list))
...and be confident that the connections will close.
这篇关于使用ThreadPoolExecutor时Django ORM泄漏连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!