使用多处理运行子流程时出现系统错误 [英] System error while running subprocesses using Multiprocessing

查看:83
本文介绍了使用多处理运行子流程时出现系统错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用多处理程序包(在Amazon EC2的Ubuntu 12.04上为python 2.73和numpy 1.7.0进行并行处理)并行执行一些简单的基于numpy的矩阵代数计算时,出现系统错误(如下所示).对于较小的矩阵,我的代码工作正常,但对于较大的矩阵(具有足够的可用内存),我的代码会崩溃

I am getting a system error (shown below) while performing some simple numpy-based matrix algebra calculations in parallel using Multiprocessing package (python 2.73 with numpy 1.7.0 on Ubuntu 12.04 on Amazon EC2). My code works fine for smaller matrix sizes but crashes for larger ones (with plenty of available memory)

我使用的矩阵的大小很大(我的代码对于1000000x10浮点密集型矩阵运行正常,但对于1000000x500的浮点密集型矩阵却崩溃了-我正在将这些矩阵传递到子流程/从子流程传递). 10 vs 500是一个运行时参数,其他所有内容都保持不变(输入数据,其他运行时参数等)

The size of the matrices I use is substantial (my code runs fine for 1000000x10 float dense matrices but crashes for 1000000x500 ones - I am passing these matrices to/from subprocesses by the way). 10 vs 500 is a run-time parameter, everything else stays the same (input data, other run-time parameters etc.)

我还尝试使用python3运行相同的(移植的)代码-对于较大的矩阵,子进程进入睡眠/空闲模式(而不是像python 2.7那样崩溃),并且程序/子进程只是挂在那儿没有.对于较小的矩阵,代码可以在python3上正常运行.

I've also tried to run the same (ported) code using python3 - for larger matrices the subprocesses go to a sleep/idle mode (instead of crashing as in python 2.7) and the program/subprocesses just hang in there doing nothing. For smaller matrices the code runs fine with python3.

任何建议都将受到高度赞赏(我在这里用尽了所有想法)

Any suggestions would be highly appreciated (I am running out of ideas here)

错误消息:

Exception in thread Thread-5: Traceback (most recent call last):  
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()   File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task) SystemError: NULL result without error in PyObject_Call

我使用的多处理代码:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
    if len(listOfInputs) == 0:
        return
    # Add result queue to the list of argument tuples.
    resultQueue = mp.Manager().Queue()
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
    # Create and initialize the pool of workers.
    pool = mp.Pool(processes = nParallelProcesses)
    pool.map(proc, listOfInputsNew)
    # Run the processes.
    pool.close()
    pool.join()
    # Return the results.
    return [resultQueue.get() for i in range(len(listOfInputs))]

下面是为每个子流程执行的"proc".基本上,它使用numpy解决了许多线性方程组(在子过程中构造了所需的矩阵),并将结果作为另一个矩阵返回.再次,它对于较小的一个运行时参数可以正常工作,但对于较大的参数会崩溃(或挂在python3中).

Below is the "proc" that gets executed for each subprocess. Basically, it solves many systems of linear equations using numpy (it constructs required matrices inside the subprocess) and returns the results as another matrix. Once again, it works fine for smaller values of one run-time parameter but crashes (or hangs in python3) for larger ones.

def solveForLFV(param):
    startTime = time.time()
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
    LFoutChunkSize = XY.shape[0]
    nLFdim = LFVin.shape[1]
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
    for LFVoutIndex in xrange(LFoutChunkSize):
        LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
        sumLFVinOuterProductLFVpurch[:, :] = 0.
        LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
        for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
            LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
            sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
        LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
    queue.put((chunkI, LFVoutChunk))
    print 'solveForLFV: ', time.time() - startTime, 'sec'
    sys.stdout.flush()

推荐答案

500,000,000非常大:如果您使用的是float64,则为40亿字节,约4 GB. (1000万个float数组将为8000万个字节,或大约80 MB-还要小得多.)我希望问题与尝试将数组腌制以通过管道发送给子进程的多处理有关.

500,000,000 is pretty big: if you're using float64, that's 4 billion bytes, or about 4 GB. (The 10,000,000 float array would be 80 million bytes, or about 80 MB - much smaller.) I expect the problem has something to do with multiprocessing trying to pickle up the arrays to send to the subprocesses over a pipe.

由于您使用的是UNIX平台,因此可以通过利用fork()的内存继承行为(用于创建多处理程序的工作程序)来避免此行为.我在这种黑客攻击方面取得了巨大的成功(从此项目),并在评论中进行描述.

Since you're on a unix platform, you can avoid this behavior by exploiting the memory inheritance behavior of fork() (used to create multiprocessing's workers). I've had great success with this hack (ripped out of this project), described by the comments.

### A helper for letting the forked processes use data without pickling.
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10))
    for _ in itertools.count())
class ForkedData(object):
    '''
    Class used to pass data to child processes in multiprocessing without
    really pickling/unpickling it. Only works on POSIX.

    Intended use:
        - The master process makes the data somehow, and does e.g.
            data = ForkedData(the_value)
        - The master makes sure to keep a reference to the ForkedData object
          until the children are all done with it, since the global reference
          is deleted to avoid memory leaks when the ForkedData object dies.
        - Master process constructs a multiprocessing.Pool *after*
          the ForkedData construction, so that the forked processes
          inherit the new global.
        - Master calls e.g. pool.map with data as an argument.
        - Child gets the real value through data.value, and uses it read-only.
    '''
    # TODO: does data really need to be used read-only? don't think so...
    # TODO: more flexible garbage collection options
    def __init__(self, val):
        g = globals()
        self.name = next(n for n in _data_name_cands if n not in g)
        g[self.name] = val
        self.master_pid = os.getpid()

    @property
    def value(self):
        return globals()[self.name]

    def __del__(self):
        if os.getpid() == self.master_pid:
            del globals()[self.name]

这篇关于使用多处理运行子流程时出现系统错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆