Python 多处理:如何限制等待进程的数量? [英] Python multiprocessing: how to limit the number of waiting processes?
问题描述
使用Pool.apply_async运行大量任务(大参数)时,进程被分配并进入等待状态,等待进程数没有限制.这可能会耗尽所有内存,如下例所示:
When running a large number of tasks (with large parameters) using Pool.apply_async, the processes are allocated and go to a waiting state, and there is no limit for the number of waiting processes. This can end up by eating all memory, as in the example below:
import multiprocessing
import numpy as np
def f(a,b):
return np.linalg.solve(a,b)
def test():
p = multiprocessing.Pool()
for _ in range(1000):
p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
p.close()
p.join()
if __name__ == '__main__':
test()
我正在寻找一种限制等待队列的方法,这样一来只有有限数量的等待进程,并且在等待队列已满时阻塞 Pool.apply_async.
I'm searching for a way to limit the waiting queue, in such a way that there is only a limited number of waiting processes, and Pool.apply_async is blocked while the waiting queue is full.
推荐答案
multiprocessing.Pool
有一个 _taskqueue
类型为 multiprocessing.Queue
的成员,它带有一个可选的 maxsize
参数;不幸的是,它在没有 maxsize
参数集的情况下构建它.
multiprocessing.Pool
has a _taskqueue
member of type multiprocessing.Queue
, which takes an optional maxsize
parameter; unfortunately it constructs it without the maxsize
parameter set.
我建议使用将 maxsize
传递给 的
构造函数.multiprocessing.Pool.__init__
复制粘贴子类化 multiprocessing.Pool
_taskqueue
I'd recommend subclassing multiprocessing.Pool
with a copy-paste of multiprocessing.Pool.__init__
that passes maxsize
to _taskqueue
constructor.
对对象(池或队列)进行猴子修补也可以,但您必须对 pool._taskqueue._maxsize
和 pool._taskqueue._sem进行猴子修补代码>所以它会很脆弱:
Monkey-patching the object (either the pool or the queue) would also work, but you'd have to monkeypatch pool._taskqueue._maxsize
and pool._taskqueue._sem
so it would be quite brittle:
pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
这篇关于Python 多处理:如何限制等待进程的数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!