python 3.4 multiprocessing不适用于unittest [英] python 3.4 multiprocessing does not work with unittest
问题描述
我有一个使用多重处理的单元测试.
I have an unittest which is using multiprocessing.
从Python 3.2升级到Python 3.4后,出现以下错误. 我找不到任何提示,无法在Python内部进行更改以及必须更改才能使代码运行.
After upgrading from Python 3.2 to Python 3.4 I get following error. I can't find a hint, what was changed inside Python and what I have to change, to make my code running.
先谢谢了.
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python341_64\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Python341_64\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input
Error
Traceback (most recent call last):
File "D:\test_multiproc.py", line 46, in testSmallWorkflow
p.start()
File "C:\Python341_64\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Python341_64\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Python341_64\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Python341_64\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Python341_64\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
下面是一个示例代码,我该如何重现该错误:
Following a sample code, how I can reproduce the error:
import shutil
import traceback
import unittest
import time
from multiprocessing import Process
import os
class MyTest(unittest.TestCase):
#---------------------------------------------------------------------------
def setUp(self):
self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
os.mkdir(self.working_dir)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def tearDown(self):
try:
time.sleep(5)
shutil.rmtree(self.working_dir, ignore_errors=True)
except OSError as err:
traceback.print_tb(err)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def info(self, title):
print(title)
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def f(self, name):
self.info('function f')
print('hello', name)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def testSmallWorkflow(self):
self.info('main line')
p = Process(target=self.f, args=('bob',))
p.start()
p.join()
#---------------------------------------------------------------------------
推荐答案
问题是unittest.TestCase
类本身不再是可腌制的,并且您必须对其进行腌制以腌制其绑定方法之一(
The problem is that the unittest.TestCase
class itself is no longer pickleable, and you have to pickle it in order to pickle one of its bound methods (self.f
). An easy workaround would be to create a separate class for the methods you need to call in the child process:
class Tester:
def info(self, title=None):
print("title {}".format(title))
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def f(self, name):
self.info('function f')
print('hello', name)
#-------------------------------
class MyTest(unittest.TestCase):
#---------------------------------------------------------------------------
def setUp(self):
self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
os.mkdir(self.working_dir)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def tearDown(self):
try:
time.sleep(5)
shutil.rmtree(self.working_dir, ignore_errors=True)
except OSError as err:
traceback.print_tb(err)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def testSmallWorkflow(self):
t = Tester()
self.info('main line')
p = Process(target=t.f, args=('bob',))
p.start()
p.join()
或者,您使用 __setstate__
/__getstate__
从TestCase
中删除无法修复的对象.在这种情况下,它是一个称为_Outcome
的内部类.我们不在乎孩子中的它,因此我们可以将其从腌制状态中删除:
Alternatively, you use __setstate__
/__getstate__
to remove the object from the TestCase
that's unpickleable. In this case, it's an internal class called _Outcome
. We don't care about it in the child, so we can just delete it from the pickled state:
class MyTest(unittest.TestCase):
#---------------------------------------------------------------------------
def setUp(self):
self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
os.mkdir(self.working_dir)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def tearDown(self):
try:
time.sleep(2)
shutil.rmtree(self.working_dir, ignore_errors=True)
except OSError as err:
traceback.print_tb(err)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def info(self, title=None):
print("title {}".format(title))
print('module name:', __name__)
if hasattr(os, 'getppid'): # only available on Unix
print('parent process:', os.getppid())
print('process id:', os.getpid())
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def f(self, name):
self.info('function f')
print('hello', name)
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def testSmallWorkflow(self):
t = Tester()
self.info('main line')
p = Process(target=self.f, args=('bob',))
p.start()
p.join()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['_outcome']
return self_dict
def __setstate(self, state):
self.__dict__.update(self_dict)
这篇关于python 3.4 multiprocessing不适用于unittest的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!