类方法的Python多处理错误 [英] Python multiprocessing error with class methods

查看:93
本文介绍了类方法的Python多处理错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个程序,其中有面向对象的代码,试图在其中进行多处理.我遇到了泡菜错误,因为默认情况下python可以序列化函数,但不能序列化类方法.所以我在上不能使用建议泡菜<类型'实例方法'>当使用python的多处理Pool.map()时,问题是如果我的方法中有一些lambda表达式,则该表达式不起作用. 我的示例代码如下:

import numpy as np

from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict


class test(object):
    def __init__(self,words):
        self.words=words
#         self.testLambda = defaultdict(lambda : 1.)

    def parallel_function(self,f):
        def easy_parallize(f,sequence):
            from multiprocessing import Pool
            pool = Pool(processes=50) # depends on available cores
            result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
            cleaned = [x for x in result if not x is None] # getting results
            cleaned = np.asarray(cleaned)
            pool.close() # not optimal! but easy
            pool.join()
            return cleaned
        from functools import partial


        return partial(easy_parallize, f)

    def dummy(self):
        self.t=defaultdict(lambda:1.)

    def test(self,a,b,x):
        print x
        print a
        return x*x

    def testit(self):
        sequence=[1,2,3,4,5]
        f1=partial(self.test,'a','b')
        f_p=self.parallel_function(f1)
        results=f_p(sequence)


def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)



if __name__ ==   "__main__":
    pickle(MethodType, _pickle_method, _unpickle_method)
    t=test('fdfs')
    t.dummy()
    t.testit()

但是由于lambda表达式,出现以下错误:

Traceback (most recent call last):
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
    t.testit()
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
    results=f_p(sequence)
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
    result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

是否有任何直接的方法可以解决此问题,而无需使用其他使用莳萝或其他东西的包装?可以使用普通的python库完成此操作吗? (我正在使用python 2.7)

解决方案

如果您在发布的链接中往下看……我的答案(pathos使用dill,这是一个序列化程序,可以使python中的几乎所有内容腌制.您甚至可以动态替换该方法,并且该方法仍然有效.

>>> def parallel_funtion(self, f):
...   def easy_parallelize(f, sequence):
...     p = Pool()
...     return p.map(f, sequence)
...   return partial(easy_parallelize, f)
... 
>>> test.parallel_function = parallel_funtion 
>>> 
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]

在此处获取pathosdill: https://github.com/uqfoundation

I am writing a program where I have object oriented code where I am trying to do multiprocessing. I was getting pickle errors because by default python can serialize functions but not class methods. So I used suggestion on Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() but the problem is that if I have some lambda expressions inside my methods it's not working. My sample code is as follows:

import numpy as np

from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict


class test(object):
    def __init__(self,words):
        self.words=words
#         self.testLambda = defaultdict(lambda : 1.)

    def parallel_function(self,f):
        def easy_parallize(f,sequence):
            from multiprocessing import Pool
            pool = Pool(processes=50) # depends on available cores
            result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
            cleaned = [x for x in result if not x is None] # getting results
            cleaned = np.asarray(cleaned)
            pool.close() # not optimal! but easy
            pool.join()
            return cleaned
        from functools import partial


        return partial(easy_parallize, f)

    def dummy(self):
        self.t=defaultdict(lambda:1.)

    def test(self,a,b,x):
        print x
        print a
        return x*x

    def testit(self):
        sequence=[1,2,3,4,5]
        f1=partial(self.test,'a','b')
        f_p=self.parallel_function(f1)
        results=f_p(sequence)


def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)



if __name__ ==   "__main__":
    pickle(MethodType, _pickle_method, _unpickle_method)
    t=test('fdfs')
    t.dummy()
    t.testit()

But I get following error due to lambda expression:

Traceback (most recent call last):
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
    t.testit()
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
    results=f_p(sequence)
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
    result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Is there any straight forward way to tackle it without moving to some other package which uses dill or something? Can this be done with normal python libraries? (I am using python 2.7)

解决方案

If you look further down in the link you posted… to my answer (https://stackoverflow.com/a/21345273/2379433), you'll see you can indeed do what you want to do… even if you use lambdas and default dicts and all sorts of other python constructs. All you have to do is replace multiprocessing with pathos.multiprocessing… and it works. Note, I'm even working in the interpreter.

>>> import numpy as np
>>> from functools import partial
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from collections import defaultdict
>>> 
>>> class test(object):
...   def __init__(self, words):
...     self.words = words
...   def parallel_function(self, f):
...     def easy_parallelize(f, sequence):
...       p = Pool()
...       result = p.map(f, sequence)
...       cleaned = [x for x in result if not x is None]
...       cleaned = np.asarray(cleaned)
...       return cleaned
...     return partial(easy_parallelize, f)
...   def dummy(self):
...     self.t = defaultdict(lambda: 1.)
...   def test(self, a, b, x):
...     print x
...     print a
...     print x*x
...   def testit(self):
...     sequence = [1,2,3,4,5]
...     f1 = partial(self.test, 'a','b')
...     f_p = self.parallel_function(f1)
...     results = f_p(sequence)
...     return results
... 
>>> t = test('fdfs')
>>> t.dummy()
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
array([], dtype=float64)

"It works" because pathos uses dill, which is a serializer that can pickle almost anything in python. You can even dynamically replace the method, and it still works.

>>> def parallel_funtion(self, f):
...   def easy_parallelize(f, sequence):
...     p = Pool()
...     return p.map(f, sequence)
...   return partial(easy_parallelize, f)
... 
>>> test.parallel_function = parallel_funtion 
>>> 
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]

Get pathos and dill here: https://github.com/uqfoundation

这篇关于类方法的Python多处理错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆