python 3.4 multiprocessing不适用于unittest [英] python 3.4 multiprocessing does not work with unittest

查看:50
本文介绍了python 3.4 multiprocessing不适用于unittest的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用多重处理的单元测试.

I have an unittest which is using multiprocessing.

从Python 3.2升级到Python 3.4后,出现以下错误. 我找不到任何提示,无法在Python内部进行更改以及必须更改才能使代码运行.

After upgrading from Python 3.2 to Python 3.4 I get following error. I can't find a hint, what was changed inside Python and what I have to change, to make my code running.

先谢谢了.

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python341_64\lib\multiprocessing\spawn.py", line 106, in spawn_main
    exitcode = _main(fd)
  File "C:\Python341_64\lib\multiprocessing\spawn.py", line 116, in _main
    self = pickle.load(from_parent)
EOFError: Ran out of input

Error
Traceback (most recent call last):
  File "D:\test_multiproc.py", line 46, in testSmallWorkflow
    p.start()
  File "C:\Python341_64\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python341_64\lib\multiprocessing\context.py", line 212, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python341_64\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
  File "C:\Python341_64\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python341_64\lib\multiprocessing\reduction.py", line 59, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

下面是一个示例代码,我该如何重现该错误:

Following a sample code, how I can reproduce the error:

import shutil
import traceback
import unittest
import time
from multiprocessing import Process
import os


class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(5)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def info(self, title):
        print(title)
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        self.info('main line')
        p = Process(target=self.f, args=('bob',))
        p.start()
        p.join()
    #---------------------------------------------------------------------------

推荐答案

问题是unittest.TestCase类本身不再是可腌制的,并且您必须对其进行腌制以腌制其绑定方法之一().一个简单的解决方法是为您需要在子进程中调用的方法创建一个单独的类:

The problem is that the unittest.TestCase class itself is no longer pickleable, and you have to pickle it in order to pickle one of its bound methods (self.f). An easy workaround would be to create a separate class for the methods you need to call in the child process:

class Tester:
    def info(self, title=None):
        print("title {}".format(title))
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #-------------------------------


class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(5)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        t = Tester()
        self.info('main line')
        p = Process(target=t.f, args=('bob',))
        p.start()
        p.join()

或者,您使用 __setstate__/__getstate__ TestCase中删除无法修复的对象.在这种情况下,它是一个称为_Outcome的内部类.我们不在乎孩子中的它,因此我们可以将其从腌制状态中删除:

Alternatively, you use __setstate__/__getstate__ to remove the object from the TestCase that's unpickleable. In this case, it's an internal class called _Outcome. We don't care about it in the child, so we can just delete it from the pickled state:

class MyTest(unittest.TestCase):

    #---------------------------------------------------------------------------
    def setUp(self):
        self.working_dir = os.path.join(os.environ["TEMP"], "Testing")
        os.mkdir(self.working_dir)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def tearDown(self):
        try:
            time.sleep(2)
            shutil.rmtree(self.working_dir, ignore_errors=True)
        except OSError as err:
            traceback.print_tb(err)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def info(self, title=None):
        print("title {}".format(title))
        print('module name:', __name__)
        if hasattr(os, 'getppid'):  # only available on Unix
            print('parent process:', os.getppid())
        print('process id:', os.getpid())
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def f(self, name):
        self.info('function f')
        print('hello', name)
    #---------------------------------------------------------------------------

    #---------------------------------------------------------------------------
    def testSmallWorkflow(self):
        t = Tester()
        self.info('main line')
        p = Process(target=self.f, args=('bob',))
        p.start()
        p.join()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['_outcome']
        return self_dict

    def __setstate(self, state):
        self.__dict__.update(self_dict)

这篇关于python 3.4 multiprocessing不适用于unittest的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆