多重处理可在Ubuntu中运行,而不适用于Windows [英] Multiprocessing works in Ubuntu, doesn't in Windows

查看:47
本文介绍了多重处理可在Ubuntu中运行,而不适用于Windows的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用此例如作为我的cherrypy应用程序上的排队系统的模板.

I am trying to use this example as a template for a queuing system on my cherrypy app.

我能够将其从python 2转换为python 3(将from Queue import Empty更改为from queue import Empty)并在Ubuntu中执行它.但是当我在Windows中执行它时,出现以下错误:

I was able to convert it from python 2 to python 3 (change from Queue import Empty into from queue import Empty) and to execute it in Ubuntu. But when I execute it in Windows I get the following error:

F:\workspace\test>python test.py
Traceback (most recent call last):
  File "test.py", line 112, in <module>
    broker.start()
  File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
  File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

F:\workspace\test>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main
    new_handle = steal_handle(parent_pid, pipe_handle)
  File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle
    _winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect

这是完整的代码:

# from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html

import sys
import logging
from logging import handlers

from cherrypy.process import wspbus

class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)

    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()

    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)

        self.logger = logger

    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()

    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)



import random
import string
from multiprocessing import Process

class Bank(object):
    def __init__(self, queue):
        self.bus = MyBus(Bank.__name__)
        self.queue = queue
        self.bus.subscribe("main", self.randomly_place_order)
        self.bus.subscribe("exit", self.terminate)

    def randomly_place_order(self):
        order = random.sample(['BUY', 'SELL'], 1)[0]
        code = random.sample(string.ascii_uppercase, 4)
        amount = random.randint(0, 100)

        message = "%s %s %d" % (order, ''.join(code), amount)

        self.bus.log("Placing order: %s" % message)

        self.queue.put(message)

    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)

    def terminate(self):
        self.bus.unsubscribe("main", self.randomly_place_order)
        self.bus.unsubscribe("exit", self.terminate)


from queue import Empty

class Broker(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue
        self.bus = MyBus(Broker.__name__)
        self.bus.subscribe("main", self.check)

    def check(self):
        try:
            message = self.queue.get_nowait()
        except Empty:
            return

        if message == "stop":
            self.bus.unsubscribe("main", self.check)
            self.bus.exit()
        elif message.startswith("BUY"):
            self.buy(*message.split(' ', 2)[1:])
        elif message.startswith("SELL"):
            self.sell(*message.split(' ', 2)[1:])

    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)

    def stop(self):
        self.queue.put("stop")

    def buy(self, code, amount):
        self.bus.log("BUY order placed for %s %s" % (amount, code))

    def sell(self, code, amount):
        self.bus.log("SELL order placed for %s %s" % (amount, code))




if __name__ == '__main__':
    from multiprocessing import Queue
    queue = Queue()

    broker = Broker(queue)
    broker.start()

    bank = Bank(queue)
    bank.run()

推荐答案

问题是MyBus对象的某些部分不可腌制,并且您将MyBus的实例保存到Broker实例.由于Windows缺少fork()支持,因此在调用broker.start()时,必须在multiprocessing生成的子进程中腌制并重新创建broker的整个状态,以执行broker.run.它可以在Linux上运行,因为Linux支持fork.在这种情况下,它不需要进行任何腌制-子进程在分叉后立即包含父级的完整状态.

The problem is that parts of the MyBus object are not picklable, and you're saving an instance of MyBus to your Broker instance. Because Windows lacks fork() support, when you call broker.start(), the entire state of broker must be pickled and recreated in the child process that multiprocessing spawns to execute broker.run. It works on Linux because Linux supports fork; it doesn't need to pickle anything in this case - the child process contains the complete state of the parent as soon as it is forked.

有两种方法可以解决此问题.第一种,也是更困难的方法,是使您的broker实例可腌制.为此,您需要使MyBus可腌制.您现在遇到的错误是指MyBus上的logger属性,该属性不可腌制.这个很容易解决;只需将__getstate__/__setstate__方法添加到MyBus,即可用于控制如何对对象进行酸洗/酸洗.如果我们在腌制时删除记录器,而在腌制时重新创建它,我们将解决此问题:

There are two ways to sole this problem. The first, and more difficult, way, is to make your broker instance picklable. To do that, you need to make MyBus picklable. The error you're getting right now refers to the logger attribute on MyBus, which is not picklable. That one is easy to fix; just add __getstate__/__setstate__ methods to MyBus, which are used to control how the object is pickled/unpickled. If we remove the logger when we pickle, and recreate it when we unpickle, we'll work around the issue:

class MyBus(wspbus.Bus):
    ... 
    def __getstate__(self):
        self_dict = self.__dict__
        del self_dict['logger']
        return self_dict

    def __setstate__(self, d):
        self.__dict__.update(d)
        self.open_logger()

这可行,但随后我们遇到了另一个酸洗错误:

This works, but then we hit another pickling error:

Traceback (most recent call last):
  File "async2.py", line 121, in <module>
    broker.start()
  File "C:\python34\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen
    return Popen(process_obj)
  File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'cherrypy.process.wspbus._StateEnum.State'>: attribute lookup State on cherrypy.process.wspbus failed

转出cherrypy.process.wspbus._StateEnum.State,这是MyBus继承的wspbus.Bus类的一个属性,它是一个嵌套类,不能对嵌套类进行腌制:

Turns outcherrypy.process.wspbus._StateEnum.State, which is an attribute on the wspbus.Bus class inherited by MyBus, is a nested class, and nested classes can't be pickled:

class _StateEnum(object):
    class State(object):
        name = None
        def __repr__(self):
            return "states.%s" % self.name

State对象(惊奇)用于跟踪Bus实例的状态.由于我们是在启动总线之前进行酸洗的,所以我们可以在酸洗时从对象中删除state属性,并在酸洗时将其设置为States.STOPPED.

The State object (surprise) is used to track the Bus instance's state. Since we're doing the pickling before we start up the bus, we could just remove the state attribute from the object when we pickle, and set it to States.STOPPED when we unpickle.

class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)

    def __getstate__(self):
        self_dict = self.__dict__
        del self_dict['logger']
        del self_dict['state']
        return self_dict

    def __setstate__(self, d):
        self.__dict__.update(d)
        self.open_logger()
        self.state = wspbus.states.STOPPED  # Initialize to STOPPED

进行这些更改后,代码将按预期工作!唯一的限制是,如果总线还没有启动,则只能安全地腌制MyBus,这对于您的用例是可以的.

With these changes, the code works as expected! The only limitation is that it's only safe to pickle MyBus if the bus hasn't started yet, which is fine for your usecase.

同样,这是困难的方法.最简单的方法是完全消除使MyBus实例腌制的需要.您可以在子进程中创建MyBus实例,而不是在父进程中创建

Again, this is the hard way. The easy way is to just remove the need to pickle the MyBus instance altogether. You can just create the MyBus instance in the child process, rather than the parent:

class Broker(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue

...
    def run(self):
        self.bus = MyBus(Broker.__name__)  # Create the instance here, in the child
        self.bus.subscribe("main", self.check)
        self.bus.start()
        self.bus.block(interval=0.01)

只要您不需要在父级中访问broker.bus,这就是更简单的选择.

As long as you don't need to access broker.bus in the parent, this is the simpler option.

这篇关于多重处理可在Ubuntu中运行,而不适用于Windows的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆