如何在并行化中使用非顶级函数? [英] How to use non-top-level functions in parallelization?
问题描述
我想在我编写的代码中使用大量处理过程,如下面的示例所示:
I'd like to use multiprocessing in a rescource-heavy computation in a code I write, as shown in this watered-down example:
import numpy as np
import multiprocessing as multiproc
def function(r, phi, z, params):
"""returns an array of the timepoints and the corresponding values
(demanding computation in actual code, with iFFT and stuff)"""
times = np.array([1.,2.,3.])
tdependent_vals = r + z * times + phi
return np.array([times, tdependent_vals])
def calculate_func(rmax, zmax, phi, param):
rvals = np.linspace(0,rmax,5)
zvals = np.linspace(0,zmax,5)
for r in rvals:
func_at_r = lambda z: function(r, phi, z, param)[1]
with multiproc.Pool(2) as pool:
fieldvals = np.array([*pool.map(func_at_r, zvals)])
print(fieldvals) #for test, it's actually saved in a numpy array
calculate_func(3.,4.,5.,6.)
如果我运行此命令,它将失败并显示
If I run this, it fails with
AttributeError: Can't pickle local object 'calculate_func.<locals>.<lambda>'
根据文档,只能对顶级定义的函数进行腌制,而我在函数中定义的 lambda
则不能.但是我看不出有什么办法可以使它成为一个独立的函数,至少在不使用一堆顶级变量污染模块的情况下:在调用 calculate_func
之前,参数是未知的,它们在每次迭代时都通过 rvals
进行更改.整个多处理对我来说是新事物,我无法提出其他选择.在 rvals
和 zvals
上并行化循环的最简单的 working 方法是什么?
What I think the reason is, according to the documentation, only top-level defined functions can be pickled, and my in-function defined lambda
can't. But I don't see any way I could make it a standalone function, at least without polluting the module with a bunch of top-level variables: the parameters are unknown before calculate_func
is called, and they're changing at each iteration over rvals
. This whole multiprocessing thing is very new to me, and I couldn't come up with an alternative. What would be the simplest working way to parallelize the loop over rvals
and zvals
?
注意:我以 answer 为起点.
推荐答案
这可能不是最好的答案,但这是一个答案,所以请不要讨厌:)
This probably isn't the best answer for this, but it's an answer, so please no hate :)
您可以编写一个可以序列化并执行函数的顶级包装器函数.这有点像函数初始化,但是我在这样的代码中解决了类似的问题.
You can just write a top level wrapper function that can be serialized and have it execute functions... This is kinda like function inception a bit but I solved a similar problem in my code like this.
这是一个简短的例子
def wrapper(arg_list, *args):
func_str = arg_list[0]
args = arg_list[1]
code = marshal.loads(base64.b64decode(func_str.data))
func = types.FunctionType(code, globals(), "wrapped_func")
return func(*args)
def run_func(func, *args):
func_str = base64.b64encode(marshal.dumps(func.__code__, 0))
arg_list = [func_str, args]
with mp.Pool(2) as pool:
results = pool.map(wrapper, arg_list)
return results
这篇关于如何在并行化中使用非顶级函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!