Python multiprocessing.Queue修改对象 [英] Python multiprocessing.Queue modifies objects

查看:205
本文介绍了Python multiprocessing.Queue修改对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,该程序在Python中实现了诸如责任链之类的东西.有一个进程将对象通过multiprocessing.Queue()传递给其他进程,然后其他进程对该对象执行操作.跟踪传递的对象的最后修改时间也很重要,因此只有在修改对象后才能采取措施.

I have an application that implements something like a Chain of Responsibility in Python. There is one process that passes objects via multiprocessing.Queue() to other processes which then perform actions on the objects. It is also important for the last modified time of the object which is passed to be tracked, so action can be taken only when the object was modified.

我遇到的问题是对象中的_modified属性从队列中提取后似乎随机改变.但是,_mtime属性始终正确.下面的示例将运行并(有意地)随机修改DummyObject,然后将其放置在每个处理程序进程的Queue上.然后,每个处理程序将打印在对象中收到的_modified和_mtime值.我希望_modified值在command_func和处理程序函数中都相同,但是通常不是这种情况.如果从DummyObject中删除Object_w_mtime继承,那么在发送和接收的对象中看不到任何差异.

The problem I am experiencing is that the _modified attribute in the object appears to change randomly after extracting it from the queue. However, the _mtime attribute is always correct. The example below will run and (intentionally) randomly modify the DummyObject, then place it on the Queue for each of the handler processes. Each handler will then print the _modified and _mtime values that they received in the object. I expect the _modified value to be the same in both the command_func and the handler functions, however that is usually not the case. If I remove the Object_w_mtime inheritance from the DummyObject, then I do not see any differences in the sent and received objects.

我对python比较陌生.据我所知,应该将每次将对象放在队列中进行腌制,然后通过管道将其发送到接收过程,以对对象进行腌制.那是对的吗?当对象被腌制/去腌制时,有什么方法会弄乱对象的继承性吗?

I'm relatively new to python. To the best of my knowledge what should be happening is each time an object is placed on a queue, it is pickled, then sent over a pipe to the receiving process which unpickles the object. Is that correct? Is there any way that the object inheritance would be messed up when the object is pickled/unpickled?

我在Ubuntu 11.10上使用Python 2.7.2和2.6.7以及在Ubuntu 11.04上使用python 2.7.1对此进行了测试.有时,您必须让它运行一分钟左右才能看到该行为,因为它似乎是随机的.

I tested this with Python 2.7.2 and 2.6.7 on Ubuntu 11.10, as well as python 2.7.1 on Ubuntu 11.04. Sometimes you have to let it run for a minute or so to see the behavior, as it appears to be random.

在这里抓稻草,谢谢.

import multiprocessing
import time
import traceback
import os
import random

class Object_w_mtime(object):
    '''
    Parent object that tracks the last time an attribute was modified
    '''
    def __setattr__(self,a_name,a_value):
        if ((a_name not in ('_mtime','_modified')) and
            (a_value != getattr(self,a_name,None))
        ):
            object.__setattr__(self, '_modified', True)
            object.__setattr__(self, '_mtime', time.time())
        object.__setattr__(self, a_name, a_value)
        return True
    #END def

    def reset(self):
        self._modified = False
#END class

class DummyObject(Object_w_mtime):
    def __init__(self):
        self.value = 10

def handler(in_queue = None, handler_id = None):
    print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
    while True:
        try:
            obj = in_queue.get(True,61)
            print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print traceback.format_exc()
            break
    return True
#END def

def command_func(next_links = None):
    print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
    obj = DummyObject()
    while True:
        try:
            # randomly assign a different value to test with a modified and unmodified object
            obj.value = random.randint(0,1)
            print '**************** obj.value = {0} ***************'.format(obj.value)
            print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
            for each in next_links:
                each.put(obj,False)
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print e
            print traceback.format_exc()
            break
        obj.reset()
        time.sleep(3)
    return True
#END def


if __name__ == '__main__':
    handler_queues = list()
    handler_processes = list()
    # Create a queue and process object for each command handler
    for handler_id in range(1,4):
        queue = multiprocessing.Queue()
        process = multiprocessing.Process(target=handler, args=(queue, handler_id))
        handler_queues.append(queue)
        handler_processes.append(process)

    try:
        # spawn handler processes
        for process in handler_processes:
            process.start()
        # Start sending commands to handlers
        command_func(handler_queues)

    # exit on keyboard interrupt
    except KeyboardInterrupt:
        for process in handler_processes:
            process.join()
    except Exception:
        traceback.print_exc()

推荐答案

简而言之,您可以在将obj放入队列后对其进行修改.

In short, you modify obj after putting it on the queue.

查看 http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup 第285行,put()仅将内部队列中的对象,如果尚未运行,则启动后台线程以处理该队列中的对象.因此,您的代码中each.put(obj,False)obj.reset()之间存在竞争.

Looking at http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup line 285, put() merely places the object in an internal queue, and if not already running, launches a background thread to process objects from that queue. Thus there is a race between each.put(obj,False) and obj.reset() in your code.

您可能应该只将队列与不可变的(副本)对象一起使用.

You should probably only use Queues with immutable (copies of) objects.

这篇关于Python multiprocessing.Queue修改对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆