如何在并行化中使用非顶级函数? [英] How to use non-top-level functions in parallelization?

查看:35
本文介绍了如何在并行化中使用非顶级函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在我编写的代码中使用大量处理过程,如下面的示例所示:

I'd like to use multiprocessing in a rescource-heavy computation in a code I write, as shown in this watered-down example:

import numpy as np
import multiprocessing as multiproc

def function(r, phi, z, params):
    """returns an array of the timepoints and the corresponding values 
       (demanding computation in actual code, with iFFT and stuff)"""
    times = np.array([1.,2.,3.])
    tdependent_vals = r + z * times + phi
    return np.array([times, tdependent_vals])

def calculate_func(rmax, zmax, phi, param):
    rvals = np.linspace(0,rmax,5)
    zvals = np.linspace(0,zmax,5)
    for r in rvals:
        func_at_r = lambda z: function(r, phi, z, param)[1]
        with multiproc.Pool(2) as pool:
             fieldvals = np.array([*pool.map(func_at_r, zvals)])
             print(fieldvals) #for test, it's actually saved in a numpy array

calculate_func(3.,4.,5.,6.)

如果我运行此命令,它将失败并显示

If I run this, it fails with

AttributeError: Can't pickle local object 'calculate_func.<locals>.<lambda>'

根据文档,只能对顶级定义的函数进行腌制,而我在函数中定义的 lambda 则不能.但是我看不出有什么办法可以使它成为一个独立的函数,至少在不使用一堆顶级变量污染模块的情况下:在调用 calculate_func 之前,参数是未知的,它们在每次迭代时都通过 rvals 进行更改.整个多处理对我来说是新事物,我无法提出其他选择.在 rvals zvals 上并行化循环的最简单的 working 方法是什么?

What I think the reason is, according to the documentation, only top-level defined functions can be pickled, and my in-function defined lambda can't. But I don't see any way I could make it a standalone function, at least without polluting the module with a bunch of top-level variables: the parameters are unknown before calculate_func is called, and they're changing at each iteration over rvals. This whole multiprocessing thing is very new to me, and I couldn't come up with an alternative. What would be the simplest working way to parallelize the loop over rvals and zvals?

注意:我以 answer 为起点.

推荐答案

这可能不是最好的答案,但这是一个答案,所以请不要讨厌:)

This probably isn't the best answer for this, but it's an answer, so please no hate :)

您可以编写一个可以序列化并执行函数的顶级包装器函数.这有点像函数初始化,但是我在这样的代码中解决了类似的问题.

You can just write a top level wrapper function that can be serialized and have it execute functions... This is kinda like function inception a bit but I solved a similar problem in my code like this.

这是一个简短的例子

def wrapper(arg_list, *args):
    func_str = arg_list[0]
    args = arg_list[1]
    code = marshal.loads(base64.b64decode(func_str.data))
    func = types.FunctionType(code, globals(), "wrapped_func")
    return func(*args)

def run_func(func, *args):
    func_str = base64.b64encode(marshal.dumps(func.__code__, 0))
    arg_list = [func_str, args]
    with mp.Pool(2) as pool:
        results = pool.map(wrapper, arg_list)
    return results

这篇关于如何在并行化中使用非顶级函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆