使用多重处理时出现PicklingError [英] PicklingError when using multiprocessing

查看:55
本文介绍了使用多重处理时出现PicklingError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在多处理模块中使用Pool.map_async()(以及Pool.map())时遇到麻烦.我已经实现了一个并行的循环函数,只要输入Pool.map_async的函数是常规"函数,就可以正常工作.当功能是例如类的方法,然后出现PicklingError:

I am having trouble when using the Pool.map_async() (and also Pool.map()) in the multiprocessing module. I have implemented a parallel-for-loop function that works fine as long as the function input to Pool.map_async is a "regular" function. When the function is e.g. a method to a class, then I get a PicklingError:

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我仅将Python用于科学计算,所以我对酸洗的概念不太熟悉,今天才学到一些有关它的知识.我看过之前的几个答案,例如 Can 't pickle< type'instancemethod'>使用python的multiprocessing Pool.map()时,但是即使按照答案中提供的链接,我也无法弄清楚它如何工作.

I use Python only for scientific computing so I am not so familiar with the concept of pickling, have just learned a bit about it today. I have looked at a couple of previous answers, like Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() , but I cannot figure out how to make it work, even when following the link provided in the answer.

我的代码的目标是使用多个核来模拟Normal r.v的向量.请注意,这只是一个例子,也许在多个内核上运行甚至都没有回报.

My code, were the objective is to simulate a vector of Normal r.v's with the use of multiple cores. Note that this is just an example and maybe it does not even payoff to run on multiple cores.

import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
    """
    Purpose: Evaluate function using Multiple cores.

    Input:
        func       - Function to evaluate in parallel
        arg        - Array of arguments to evaluate func(arg)  
        static_arg - The "static" argument (if any), i.e. the variables that are      constant in the evaluation of func.
        nWorkers   - Number of Workers to process computations.
    Output:
        func(i, static_arg) for i in args.

    """
    # Prepare arguments for func: Collect arguments with static argument (if any)
    if static_arg != None:
        arguments = [[arg] + static_arg for arg in list(args)]
    else:
        arguments = args

    # Initialize workers
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function
    result = pool.map_async(func, arguments, chunksize = chunksize)
    pool.close()
    pool.join()

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
    def subfunc(a):
        return spstat.norm.rvs(loc = loc, scale = scale, size = a)
    return subfunc

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled
def test(fargs):
    x, a, b = fargs
    return spstat.norm.rvs(size = x, loc = a, scale = b)

# Try it out.
N = 1000000

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.

# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)

# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)

按照

Following the link provided in the answer to the question in Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() , Steven Bethard (almost at the end) suggests using the copy_reg module. His code is:

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

import copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

我不太了解如何使用此功能.我唯一能想到的就是将其放在我的代码之前,但没有帮助.一个简单的解决方案当然就是与可行的解决方案一起使用,并避免参与copy_reg.我对使copy_reg正常工作以充分利用多处理功能而不必每次都解决问题更感兴趣.

I don't really understand how I can make use of this. The only thing I could come up with was putting it just before my code but it did not help. A simple solution is of course to just go with the one that works and avoid getting involved with copy_reg. I am more interested in getting copy_reg to work properly to take fully advantage of multiprocessing without having to go around the problem each time.

非常感谢您的帮助.

Matias

推荐答案

这里的问题不是概念上的棘手"错误消息,而是概念上的问题: 多进程会在工作人员"不同的进程中分叉您的代码,以便执行 它的魔力.

The problem here is less of the "pickle" error message than conceptual: multiprocess does fork your code in "worker" different processes in order to perform its magic.

然后,通过无缝地序列化和反序列化数据(使用泡菜的部分),它与不同的进程之间来回发送数据.

It then sends data to and from the different process by seamlessly serializing and de-serializing the data (that is the part that uses the pickle).

当来回传递的数据的一部分是函数时,它假定被调用方进程中存在具有相同名称的函数,并且(我猜)以字符串的形式传递了函数名.由于函数是无状态的,因此被调用的工作进程仅使用接收到的数据调用该函数. (Python函数无法通过pickle进行序列化,因此只能在主进程和工作进程之间传递引用)

When part of the data passed back and forth is a function - it assumes a function with the same name exists in the callee process, and (I guess) passes the function name, as a string. Since functions are stateless, the called worker-process just calls that same function with the data it has received. (Python functions can't be serialized through pickle, so just the reference is passed between the master and the worker processes)

当您的函数是实例中的方法时-尽管当我们编写python代码时,它与函数非常相似,但带有自动" self变量,但在下面却是不同的.因为实例(对象)是有状态的.这意味着工作进程没有对象的副本,该对象是您要在另一端调用的方法的所有者.

When your function is a method in an instance - although when we code python it is much like the same thing as a function, with an "automatic" self variable, it is not the same underneath. Because instances (objects) are stateful. That means the worker process does not have a copy of the object that is the owner of the method you want to call on the other side.

解决将方法作为函数传递给map_async调用的方法也不起作用-因为多进程仅使用函数引用,而不是传递给实际函数.

Working around ways of passing your method as a function to the map_async call won't work either - as multiprocess just uses a function reference, not the actual function when passing it around.

因此,您应该(1)更改代码,以便确实将一个函数(而不是方法)传递给工作进程,将对象保留的任何状态转换为要调用的新参数. (2)为map_async调用创建一个目标"函数,该函数在工作进程侧重建所需的对象,然后在其中调用该函数. Python中最简单的类本身都是可以选择的,因此您可以在map_async调用中传递作为函数所有者本身的对象-而"target"函数将在工作端本身调用适当的方法.

So, you should (1) either change your code so that you do pass a function - and not a method - to the worker processes, converting whatever states the object keeps to new parameters to be called. (2) Create a "target" function for the map_async call that reconstructs the needed object on the worker-process side, and then calls the function inside it. Most straightforward classes in Python are pickable themselves, so you could pass the object that is the function owner itself on the map_async call - and the "target" function would call the appropriate method itself on the worker side.

(2)听起来可能很困难",但可能只是这样-除非无法腌制对象的类:

(2) may sound "difficult" but it is probably just something like this - unless your object's class can't be pickled:

import types

def target(object, *args, **kw):
    method_name = args[0]
    return getattr(object, method_name)(*args[1:])
(...)    
#And add these 3 lines prior to your map_async call:


    # Evaluate function
    if isinstance (func, types.MethodType):
        arguments.insert(0, func.__name__)
        func = target
    result = pool.map_async(func, arguments, chunksize = chunksize)

*免责声明:我尚未对此进行测试

*disclaimer: I haven't tested this

这篇关于使用多重处理时出现PicklingError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆