为什么我可以将实例方法传递给 multiprocessing.Process,而不是 multiprocessing.Pool? [英] Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

查看:34
本文介绍了为什么我可以将实例方法传递给 multiprocessing.Process,而不是 multiprocessing.Pool?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个应用程序,该应用程序将一个函数与一个 multiprocessing.Pool 并发应用.我希望这个函数是一个实例方法(所以我可以在不同的子类中对其进行不同的定义).这似乎不可能;正如我在别处了解到的,显然 绑定方法不能腌制.那么为什么以绑定方法作为目标启动 multiprocessing.Process 呢?代码如下:

导入多处理def test1():打印你好,世界 1"定义增量(x):返回 x + 1类 testClass():定义过程(自我):process1 = multiprocessing.Process(target=test1)process1.start()process1.join()process2 = multiprocessing.Process(target=self.test2)process2.start()process2.join()定义池(自我):池 = multiprocessing.Pool(1)对于 pool.imap(increment, range(10)) 中的答案:打印答案打印对于 pool.imap(self.square, range(10)) 中的答案:打印答案def test2(自我):打印你好,世界 2"定义平方(自我,x):返回 x * x定义主():c = testClass()c.process()c.pool()如果 __name__ == "__main__":主要的()

产生这个输出:

你好,世界 1你好,世界212345678910线程 Thread-2 中的异常:回溯(最近一次调用最后一次):文件C:Python27Lib	hreading.py",第 551 行,在 __bootstrap_inner 中self.run()文件C:Python27Lib	hreading.py",第 504 行,运行中self.__target(*self.__args, **self.__kwargs)文件C:Python27Libmultiprocessingpool.py",第 319 行,在 _handle_tasks放置(任务)PicklingError: Can't pickle <type 'instancemethod'>: 属性查找 __builtin__.instancemethod 失败

为什么进程可以处理绑定方法,而不能处理池?

解决方案

pickle模块通常不能pickle实例方法:

<预><代码>>>>进口泡菜>>>A类(对象):... def z(self): 打印 "hi"...>>>a = A()>>>泡菜.转储(a.z)回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中转储中的文件/usr/local/lib/python2.7/pickle.py",第 1374 行Pickler(文件,协议).转储(对象)转储中的文件/usr/local/lib/python2.7/pickle.py",第 224 行自我保存(对象)文件/usr/local/lib/python2.7/pickle.py",第306行,保存rv = 减少(self.proto)文件/usr/local/lib/python2.7/copy_reg.py",第 70 行,在 _reduce_ex引发类型错误,无法腌制 %s 个对象"% base.__name__类型错误:无法pickle instancemethod 对象

然而,multiprocessing 模块有一个自定义Picker 添加一些代码以启用此功能:

<代码>## 尝试使一些可调用的类型可被picklable#从泡菜进口泡菜类 ForkingPicker(pickler):dispatch = Pickler.dispatch.copy()@类方法定义寄存器(cls,类型,减少):def调度程序(自我,对象):房车 = 减少(目标)self.save_reduce(obj=obj, *rv)cls.dispatch[type] = 调度员def_reduce_method(m):如果 m.im_self 是 None:返回 getattr, (m.im_class, m.im_func.func_name)别的:返回 getattr, (m.im_self, m.im_func.func_name)ForkingPickler.register(类型(ForkingPickler.save),_reduce_method)

您可以使用 copy_reg 模块复制它以查看它的工作为自己:

<预><代码>>>>导入 copy_reg>>>def_reduce_method(m):...如果 m.im_self 是 None:...返回 getattr, (m.im_class, m.im_func.func_name)... 别的:...返回 getattr, (m.im_self, m.im_func.func_name)...>>>copy_reg.pickle(类型(a.z),_reduce_method)>>>泡菜.转储(a.z)"c__builtin__ getattr p0 (ccopy_reg _reconstructor p1 (c__main__ A p2 c__builtin__ object p3 Ntp4 Rp5 S'z' p6 tp7 Rp8 ."

当您使用 Process.start 在 Windows 上生成新进程时,它使用这个自定义的 ForkingPickler 来腌制你传递给子进程的所有参数:

<代码>## 窗户#别的:#剪...从泡菜导入负载,HIGHEST_PROTOCOL定义转储(对象,文件,协议=无):ForkingPickler(文件,协议).转储(对象)## 我们定义了一个类似于来自子进程的 Popen 类,但是# 其构造函数接受一个进程对象作为其参数.#类 Popen(对象):'''启动一个子进程来运行一个进程对象的代码'''_tls = thread._local()def __init__(self, process_obj):# 创建与孩子通信的管道rfd, wfd = os.pipe()# 获取管道读取端的句柄并使其可继承...# 启动进程...#设置self的属性...# 发送信息给孩子prep_data = get_preparation_data(process_obj._name)to_child = os.fdopen(wfd, 'wb')Popen._tls.process_handle = int(hp)尝试:转储(prep_data,to_child,HIGHEST_PROTOCOL)转储(process_obj,to_child,HIGHEST_PROTOCOL)最后:del Popen._tls.process_handleto_child.close()

注意向孩子发送信息"部分.它使用 dump 函数,该函数使用 ForkingPickler 来pickle 数据,这意味着您的实例方法可以被pickle.

现在,当您使用 multiprocessing.Pool 上的方法向子进程发送方法时,它正在使用 multiprocessing.Pipe 来pickle 数据.在 Python 2.7 中,multiprocessing.Pipe 是用 C 实现的,并直接调用pickle_dumps,所以它没有利用ForkingPickler.这意味着酸洗实例方法不起作用.

但是,如果您使用copy_reg 注册instancemethod 类型,而不是自定义Picklerall酸洗的尝试将受到影响.所以你可以使用它来启用酸洗实例方法,甚至通过 Pool:

导入多处理导入 copy_reg进口类型def_reduce_method(m):如果 m.im_self 是 None:返回 getattr, (m.im_class, m.im_func.func_name)别的:返回 getattr, (m.im_self, m.im_func.func_name)copy_reg.pickle(types.MethodType, _reduce_method)def test1():打印(你好,世界1")定义增量(x):返回 x + 1类 testClass():定义过程(自我):process1 = multiprocessing.Process(target=test1)process1.start()process1.join()process2 = multiprocessing.Process(target=self.test2)process2.start()process2.join()定义池(自我):池 = multiprocessing.Pool(1)对于 pool.imap(increment, range(10)) 中的答案:打印(答案)打印对于 pool.imap(self.square, range(10)) 中的答案:打印(答案)def test2(自我):打印(你好,世界2")定义平方(自我,x):返回 x * x定义主():c = testClass()c.process()c.pool()如果 __name__ == "__main__":主要的()

输出:

你好,世界 1你好,世界2得到 (0, 0, (真, 1))得到 (0, 1, (真, 2))得到 (0, 2, (真, 3))得到 (0, 3, (真, 4))得到 (0, 4, (真, 5))1GOT (0, 5, (True, 6))得到 (0, 6, (真, 7))2得到 (0, 7, (真, 8))3得到 (0, 8, (真, 9))得到 (0, 9, (真, 10))45678910得到 (1, 0, (真, 0))0得到 (1, 1, (真, 1))1得到 (1, 2, (真, 4))4得到 (1, 3, (真, 9))9得到 (1, 4, (真, 16))16得到 (1, 5, (真, 25))25得到 (1, 6, (真, 36))36得到 (1, 7, (真, 49))49得到 (1, 8, (真, 64))64得到 (1, 9, (真, 81))81没有

另请注意,在 Python 3.x 中,pickle 可以本机地pickle 实例方法类型,因此这些东西都不再重要.:)

I am trying to write an application that applies a function concurrently with a multiprocessing.Pool. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process with a bound method as a target work? The following code:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Produces this output:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:Python27Lib	hreading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:Python27Lib	hreading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:Python27Libmultiprocessingpool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Why can Processes handle bound methods, but not Pools?

解决方案

The pickle module normally can't pickle instance methods:

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

However, the multiprocessing module has a custom Pickler that adds some code to enable this feature:

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

You can replicate this using the copy_reg module to see it work for yourself:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__
getattr
p0
(ccopy_reg
_reconstructor
p1
(c__main__
A
p2
c__builtin__
object
p3
Ntp4
Rp5
S'z'
p6
tp7
Rp8
."

When you use Process.start to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

Note the "send information to the child" section. It's using the dump function, which uses ForkingPickler to pickle the data, which means your instance method can be pickled.

Now, when you use methods on multiprocessing.Pool to send a method to a child process, it's using a multiprocessing.Pipe to pickle the data. In Python 2.7, multiprocessing.Pipe is implemented in C, and calls pickle_dumps directly, so it doesn't take advantage of the ForkingPickler. That means pickling the instance method doesn't work.

However, if you use copy_reg to register the instancemethod type, rather than a custom Pickler, all attempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool:

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Output:

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

Also note that in Python 3.x, pickle can pickle instance method types natively, so none of this stuff matters any more. :)

这篇关于为什么我可以将实例方法传递给 multiprocessing.Process,而不是 multiprocessing.Pool?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆