为什么我可以将实例方法传递给multiprocessing.Process,但不能传递给multiprocessing.Pool? [英] Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?
问题描述
我正在尝试编写一个与multiprocessing.Pool
同时应用功能的应用程序.我希望这个函数是一个实例方法(所以我可以在不同的子类中不同地定义它).这似乎是不可能的.正如我在其他地方学到的,显然绑定方法无法腌.那么,为什么以绑定方法作为目标启动multiprocessing.Process
呢?以下代码:
I am trying to write an application that applies a function concurrently with a multiprocessing.Pool
. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process
with a bound method as a target work? The following code:
import multiprocessing
def test1():
print "Hello, world 1"
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print answer
print
for answer in pool.imap(self.square, range(10)):
print answer
def test2(self):
print "Hello, world 2"
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
产生此输出:
Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
self.run()
File "C:\Python27\Lib\threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
为什么进程可以处理绑定方法,而池不能处理?
Why can Processes handle bound methods, but not Pools?
推荐答案
pickle
模块通常不能腌制实例方法:
The pickle
module normally can't pickle instance methods:
>>> import pickle
>>> class A(object):
... def z(self): print "hi"
...
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects
但是,multiprocessing
模块具有一个自定义的Pickler
,该添加了一些启用此功能的代码:
However, the multiprocessing
module has a custom Pickler
that adds some code to enable this feature:
#
# Try making some callable types picklable
#
from pickle import Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
您可以使用 copy_reg
模块复制此文件,以查看它是否可以自己运行:
You can replicate this using the copy_reg
module to see it work for yourself:
>>> import copy_reg
>>> def _reduce_method(m):
... if m.im_self is None:
... return getattr, (m.im_class, m.im_func.func_name)
... else:
... return getattr, (m.im_self, m.im_func.func_name)
...
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."
在Windows上使用Process.start
生成新进程时,使用此自定义ForkingPickler
腌制您传递给子进程的所有参数:
When you use Process.start
to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler
:
#
# Windows
#
else:
# snip...
from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument.
#
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
# create pipe for communication with child
rfd, wfd = os.pipe()
# get handle for read end of the pipe and make it inheritable
...
# start process
...
# set attributes of self
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
请注意将信息发送给孩子"部分.它使用dump
函数,该函数使用ForkingPickler
腌制数据,这意味着可以腌制您的实例方法.
Note the "send information to the child" section. It's using the dump
function, which uses ForkingPickler
to pickle the data, which means your instance method can be pickled.
现在,当您在multiprocessing.Pool
上使用方法将方法发送给子进程时,它正在使用multiprocessing.Pipe
来腌制数据.在Python 2.7中,multiprocessing.Pipe
是用C实现的,并直接调用pickle_dumps
,因此它没有利用ForkingPickler
的优势.这意味着腌制实例方法不起作用.
Now, when you use methods on multiprocessing.Pool
to send a method to a child process, it's using a multiprocessing.Pipe
to pickle the data. In Python 2.7, multiprocessing.Pipe
is implemented in C, and calls pickle_dumps
directly, so it doesn't take advantage of the ForkingPickler
. That means pickling the instance method doesn't work.
但是,如果使用copy_reg
而不是自定义的Pickler
来注册instancemethod
类型,则所有酸洗的尝试都会受到影响.因此,即使通过Pool
,您也可以使用它来启用酸洗实例方法:
However, if you use copy_reg
to register the instancemethod
type, rather than a custom Pickler
, all attempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool
:
import multiprocessing
import copy_reg
import types
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)
def test1():
print("Hello, world 1")
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print(answer)
print
for answer in pool.imap(self.square, range(10)):
print(answer)
def test2(self):
print("Hello, world 2")
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
输出:
Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
1GOT (0, 5, (True, 6))
GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10
GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
GOT (1, 6, (True, 36))
36
GOT (1, 7, (True, 49))
49
GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None
还请注意,在Python 3.x中,pickle
可以本地腌制实例方法类型,因此这些东西都不再重要了. :)
Also note that in Python 3.x, pickle
can pickle instance method types natively, so none of this stuff matters any more. :)
这篇关于为什么我可以将实例方法传递给multiprocessing.Process,但不能传递给multiprocessing.Pool?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!