启动初始化并发.futures.ProcessPoolExecutor? [英] Launch concurrent.futures.ProcessPoolExecutor with initialization?

查看:215
本文介绍了启动初始化并发.futures.ProcessPoolExecutor?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算使用concurrent.futures.ProcessPoolExecutor并行执行功能.根据文档,其executor对象只能接受一个简单的map中的功能.我的实际情况是在执行待并行化"功能之前进行初始化(数据加载).我该如何安排?

I'm planning to use concurrent.futures.ProcessPoolExecutor to parallelize execution of functions. According to the documentation, its executor object can only accept a simple function in map. My actual situation involves initialization (loading of data) prior to execution of the 'to-be-parallelized' function. How do I arrange that?

待并行化"功能在一次迭代中被多次调用.我不希望每次都重新初始化它.

The 'to-be-parallelized' function is called in an iteration for many times. I don't want it to be re-initialized each time.

换句话说,有一个init函数为该tbp函数产生一些输出.每个孩子都应该拥有该输出的自己的副本,因为该功能取决于该副本.

In other words, there's an init function that produces some output to this tbp function. Each child should have its own copy of that output, because the function depended on that.

推荐答案

听起来您正在寻找与补丁等待审核,这种行为.

It sounds like you're looking for an equivalent to the initializer/initargs options that multiprocessing.Pool takes. Currently, that behavior doesn't exist for concurrent.futures.ProcessPoolExecutor, though there is a patch waiting for review that adds that behavior.

因此,您可以使用multiprocessing.Pool(可能适合您的用例),等待该补丁被合并和发布(您可能要等待一会儿:)),或推出自己的解决方案.事实证明,为需要initializer的map编写包装函数并不难,但每个进程只调用一个包装函数即可.

So, you can either use multiprocessing.Pool (which might be fine for your usecase), wait for that patch to get merged and released (you might be waiting a while :)), or roll your own solution. Turns out, it's not too hard to write a wrapper function for map that takes an initializer, but only calls it one per process:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

inited = False
initresult = None

def initwrapper(initfunc, initargs, f, x):
    # This will be called in the child. inited
    # Will be False the first time its called, but then
    # remain True every other time its called in a given
    # worker process.
    global inited, initresult
    if not inited:
        inited = True
        initresult = initfunc(*initargs)
    return f(x)

def do_init(a,b):
    print('ran init {} {}'.format(a,b))
    return os.getpid() # Just to demonstrate it will be unique per process

def f(x):
    print("Hey there {}".format(x))
    print('initresult is {}'.format(initresult))
    return x+1

def initmap(executor, initializer, initargs, f, it):
    return executor.map(partial(initwrapper, initializer, initargs, f), it)


if __name__ == "__main__":
    with ProcessPoolExecutor(4) as executor:
        out = initmap(executor, do_init, (5,6), f, range(10))
    print(list(out))

输出:

ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

这篇关于启动初始化并发.futures.ProcessPoolExecutor?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆