如何重用多处理池? [英] How to reuse a multiprocessing pool?

查看:58
本文介绍了如何重用多处理池?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

底部是我现在拥有的代码.它似乎工作正常.但是,我并不完全理解它.我想如果没有 .join(),我会冒着代码在池完成执行之前进入下一个 for 循环的风险.我们不需要那 3 行注释掉吗?

At the bottom is the code I have now. It seems to work fine. However, I don't completely understand it. I thought without .join(), I'd risking the code going onto the next for-loop before the pool finishes executing. Wouldn't we need those 3 commented-out lines?

另一方面,如果我使用 .close().join() 方式,有没有办法重新打开"关闭的池而不是 Pool(6) 每次?

On the other hand, if I were to go with the .close() and .join() way, is there any way to 'reopen' that closed pool instead of Pool(6) every time?

import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time


def mesh_subset(population, n_chosen=5):
    chosen = rdm.choices(population, k=n_chosen)
    return mean(chosen)


if __name__ == '__main__':
    population = [x for x in range(20)]
    N_iteration = 10
    start_time = time.time()
    pool = mp.Pool(6)
    for i in range(N_iteration):
        print([round(x,2) for x in population])
        print(stdev(population))
        # pool = mp.Pool(6)
        population = pool.map(mesh_subset, [population]*len(population))
        # pool.close()
        # pool.join()
    print('run time:', time.time() - start_time)

推荐答案

工作人员池的设置成本相对较高,因此应该(如果可能)只设置一次,通常在脚本的开头.

A pool of workers is a relatively costly thing to set up, so it should be done (if possible) only once, usually at the beginning of the script.

pool.map 命令会阻塞,直到所有任务完成.毕竟,它返回一个结果列表.除非 mesh_subset 已在所有输入上调用并为每个输入返回结果,否则它无法做到这一点.相比之下,像 pool.apply_async 这样的方法不会阻塞.apply_async 返回一个带有 get 方法的 ApplyResult 对象,该方法会阻塞,直到它从工作进程获得结果.

The pool.map command blocks until all the tasks are completed. After all, it returns a list of the results. It couldn't do that unless mesh_subset has been called on all the inputs and has returned a result for each. In contrast, methods like pool.apply_async do not block. apply_async returns an ApplyResult object with a get method which blocks until it obtains a result from a worker process.

pool.close 将工作处理程序的状态 设置为 CLOSE.这会导致处理程序向工作人员发出信号终止.

pool.close sets the worker handler's state to CLOSE. This causes the handler to signal the workers to terminate.

pool.join 会阻塞,直到所有工作进程都终止.

The pool.join blocks until all the worker processes have been terminated.

所以你不需要调用 -- 事实上你不应该调用 -- pool.closepool.join 直到你完成池.一旦工作人员被发送终止信号(通过 pool.close),就无法重新打开"它们.您需要改为启动一个新池.

So you don't need to call -- in fact you shouldn't call -- pool.close and pool.join until you are finished with the pool. Once the workers have been sent the singnal to terminate (by pool.close), there is no way to "reopen" them. You would need to start a new pool instead.

在您的情况下,由于您确实希望循环等待所有任务完成,因此使用 pool.apply_async 而不是 没有任何优势>pool.map.但是,如果您要使用 pool.apply_async,则可以通过调用 get 而不是关闭并重新启动池来获得与以前相同的结果:

In your situation, since you do want the loop to wait until all the tasks are completed, there would be no advantage to using pool.apply_async instead of pool.map. But if you were to use pool.apply_async, you could obtain the same result as before by calling get instead of resorting to closing and restarting the pool:

# you could do this, but using pool.map is simpler
for i in range(N_iteration):
    apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
    # the call to result.get() blocks until its worker process (running
    # mesh_subset) returns a value
    population = [result.get() for result in apply_results]

当循环完成时,len(population) 不变.

When the loops complete, len(population) is unchanged.

如果您不希望每个循环在所有任务完成之前阻塞,您可以使用 apply_asynccallback 功能:

If you did NOT want each loop to block until all the tasks are completed, you could use apply_async's callback feature:

N_pop = len(population)
result = []
for i in range(N_iteration):
    for i in range(N_pop):
        pool.apply_async(mesh_subset, [population]),
                         callback=result.append)
pool.close()
pool.join()
print(result)

现在,当任何 mesh_subset 返回一个 return_value 时,result.append(return_value) 被调用.对 apply_async 的调用不块,所以 N_iteration * N_pop 任务被推入 pool 的任务一下子排队.但由于池有 6 个工人,最多 6 个调用mesh_subset 在任何给定时间运行.当工人完成任务时,无论哪个工人先完成调用 result.append(return_value).所以result 中的值是无序的.这与 pool.map 不同,后者返回一个列表,其返回值与其对应的顺序相同参数列表.

Now, when any mesh_subset returns a return_value, result.append(return_value) is called. The calls to apply_async do not block, so N_iteration * N_pop tasks are pushed into the pools task queue all at once. But since the pool has 6 workers, at most 6 calls to mesh_subset are running at any given time. As the workers complete the tasks, whichever worker finishes first calls result.append(return_value). So the values in result are unordered. This is different than pool.map which returns a list whose return values are in the same order as its corresponding list of arguments.

除非出现异常,result 最终将包含 N_iteration * N_pop 返回值一次所有任务都完成了.上面,pool.close()pool.join() 用于等待所有任务完成.

Barring an exception, result will eventually contain N_iteration * N_pop return values once all the tasks complete. Above, pool.close() and pool.join() were used to wait for all the tasks to complete.

这篇关于如何重用多处理池?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆