如何重用多处理池? [英] How to reuse a multiprocessing pool?
问题描述
底部是我现在拥有的代码.它似乎工作正常.但是,我并不完全理解它.我想如果没有 .join()
,我会冒着代码在池完成执行之前进入下一个 for 循环的风险.我们不需要那 3 行注释掉吗?
At the bottom is the code I have now. It seems to work fine. However, I don't completely understand it. I thought without .join()
, I'd risking the code going onto the next for-loop before the pool finishes executing. Wouldn't we need those 3 commented-out lines?
另一方面,如果我使用 .close()
和 .join()
方式,有没有办法重新打开"关闭的池而不是 Pool(6)
每次?
On the other hand, if I were to go with the .close()
and .join()
way, is there any way to 'reopen' that closed pool instead of Pool(6)
every time?
import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time
def mesh_subset(population, n_chosen=5):
chosen = rdm.choices(population, k=n_chosen)
return mean(chosen)
if __name__ == '__main__':
population = [x for x in range(20)]
N_iteration = 10
start_time = time.time()
pool = mp.Pool(6)
for i in range(N_iteration):
print([round(x,2) for x in population])
print(stdev(population))
# pool = mp.Pool(6)
population = pool.map(mesh_subset, [population]*len(population))
# pool.close()
# pool.join()
print('run time:', time.time() - start_time)
推荐答案
工作人员池的设置成本相对较高,因此应该(如果可能)只设置一次,通常在脚本的开头.
A pool of workers is a relatively costly thing to set up, so it should be done (if possible) only once, usually at the beginning of the script.
pool.map
命令会阻塞,直到所有任务完成.毕竟,它返回一个结果列表.除非 mesh_subset
已在所有输入上调用并为每个输入返回结果,否则它无法做到这一点.相比之下,像 pool.apply_async
这样的方法不会阻塞.apply_async
返回一个带有 get
方法的 ApplyResult 对象,该方法会阻塞,直到它从工作进程获得结果.
The pool.map
command blocks until all the tasks are completed. After all, it returns a list of the results. It couldn't do that unless mesh_subset
has been called on all the inputs and has returned a result for each. In contrast, methods like pool.apply_async
do not block. apply_async
returns an ApplyResult object with a get
method which blocks until it obtains a result from a worker process.
pool.close
将工作处理程序的状态 设置为 CLOSE.这会导致处理程序向工作人员发出信号终止.
pool.close
sets the worker handler's state to CLOSE. This causes the handler to signal the workers to terminate.
pool.join
会阻塞,直到所有工作进程都终止.
The pool.join
blocks until all the worker processes have been terminated.
所以你不需要调用 -- 事实上你不应该调用 -- pool.close
和 pool.join
直到你完成池.一旦工作人员被发送终止信号(通过 pool.close
),就无法重新打开"它们.您需要改为启动一个新池.
So you don't need to call -- in fact you shouldn't call -- pool.close
and pool.join
until you are finished with the pool. Once the workers have been sent the singnal to terminate (by pool.close
), there is no way to "reopen" them. You would need to start a new pool instead.
在您的情况下,由于您确实希望循环等待所有任务完成,因此使用 pool.apply_async
而不是 没有任何优势>pool.map
.但是,如果您要使用 pool.apply_async
,则可以通过调用 get
而不是关闭并重新启动池来获得与以前相同的结果:
In your situation, since you do want the loop to wait until all the tasks are completed, there would be no advantage to using pool.apply_async
instead of pool.map
. But if you were to use pool.apply_async
, you could obtain the same result as before by calling get
instead of resorting to closing and restarting the pool:
# you could do this, but using pool.map is simpler
for i in range(N_iteration):
apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
# the call to result.get() blocks until its worker process (running
# mesh_subset) returns a value
population = [result.get() for result in apply_results]
当循环完成时,len(population)
不变.
When the loops complete, len(population)
is unchanged.
如果您不希望每个循环在所有任务完成之前阻塞,您可以使用 apply_async
的 callback
功能:
If you did NOT want each loop to block until all the tasks are completed, you could use apply_async
's callback
feature:
N_pop = len(population)
result = []
for i in range(N_iteration):
for i in range(N_pop):
pool.apply_async(mesh_subset, [population]),
callback=result.append)
pool.close()
pool.join()
print(result)
现在,当任何 mesh_subset
返回一个 return_value
时,result.append(return_value)
被调用.对 apply_async
的调用不块,所以 N_iteration * N_pop
任务被推入 pool
的任务一下子排队.但由于池有 6 个工人,最多 6 个调用mesh_subset
在任何给定时间运行.当工人完成任务时,无论哪个工人先完成调用 result.append(return_value)
.所以result
中的值是无序的.这与 pool.map
不同,后者返回一个列表,其返回值与其对应的顺序相同参数列表.
Now, when any mesh_subset
returns a return_value
,
result.append(return_value)
is called. The calls to apply_async
do not
block, so N_iteration * N_pop
tasks are pushed into the pool
s task
queue all at once. But since the pool has 6 workers, at most 6 calls to
mesh_subset
are running at any given time. As the workers complete the tasks,
whichever worker finishes first calls result.append(return_value)
. So the
values in result
are unordered. This is different than pool.map
which
returns a list whose return values are in the same order as its corresponding
list of arguments.
除非出现异常,result
最终将包含 N_iteration * N_pop
返回值一次所有任务都完成了.上面,pool.close()
和 pool.join()
用于等待所有任务完成.
Barring an exception, result
will eventually contain N_iteration * N_pop
return values once
all the tasks complete. Above, pool.close()
and pool.join()
were used to
wait for all the tasks to complete.
这篇关于如何重用多处理池?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!