numpy vs.多处理和mmap [英] numpy vs. multiprocessing and mmap

查看:163
本文介绍了numpy vs.多处理和mmap的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python的multiprocessing模块来并行处理大型numpy数组.在主进程中使用numpy.load(mmap_mode='r')将数组映射到内存.之后,multiprocessing.Pool()分叉该进程(我想).

I am using Python's multiprocessing module to process large numpy arrays in parallel. The arrays are memory-mapped using numpy.load(mmap_mode='r') in the master process. After that, multiprocessing.Pool() forks the process (I presume).

一切似乎都可以正常工作,除了我得到类似以下内容的行:

Everything seems to work fine, except I am getting lines like:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

单元测试日志中的

.尽管如此,测试仍然可以通过.

in the unittest logs. The tests pass fine, nevertheless.

知道那里发生了什么吗?

Any idea what's going on there?

使用Python 2.7.2,OS X,NumPy 1.6.1.

Using Python 2.7.2, OS X, NumPy 1.6.1.

更新:

经过一些调试后,我将原因查找到使用此内存映射的numpy数组(的一小部分)作为Pool.imap调用输入的代码路径.

After some debugging, I hunted down the cause to a code path that was using a (small slice of) this memory-mapped numpy array as input to a Pool.imap call.

显然,问题"是通过multiprocessing.Pool.imap的方式将其输入传递给新流程的:它使用的是泡菜.这不适用于mmap ed的numpy数组,并且内部的中断会导致错误.

Apparently the "issue" is with the way multiprocessing.Pool.imap passes its input to the new processes: it uses pickle. This doesn't work with mmaped numpy arrays, and something inside breaks which leads to the error.

我找到了此回复由Robert Kern撰写,似乎解决了同一问题.他建议为imap输入来自内存映射的数组时创建一条特殊的代码路径:在生成的过程中手动对同一数组进行内存映射.

I found this reply by Robert Kern which seems to address the same issue. He suggests creating a special code path for when the imap input comes from a memory-mapped array: memory-mapping the same array manually in the spawned process.

这将是如此复杂和丑陋,以至于我宁愿忍受错误和额外的内存副本.还有其他方法可以更轻松地修改现有代码吗?

This would be so complicated and ugly that I'd rather live with the error and the extra memory copies. Is there any other way that would be lighter on modifying existing code?

推荐答案

我通常的方法(如果可以使用更多的内存副本)是在一个进程中完成所有IO,然后将其发送到工作线程池中.只需将x = np.array(data[yourslice])(data[yourslice].copy()实际上不执行此操作,这可能会导致某些混乱),即可将一个映射的数组的切片加载到内存中.

My usual approach (if you can live with extra memory copies) is to do all IO in one process and then send things out to a pool of worker threads. To load a slice of a memmapped array into memory just do x = np.array(data[yourslice]) (data[yourslice].copy() doesn't actually do this, which can lead to some confusion.).

首先,让我们生成一些测试数据:

First off, let's generate some test data:

import numpy as np
np.random.random(10000).tofile('data.dat')

您可以通过以下方式重现错误:

You can reproduce your errors with something like this:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

并且如果您只是改为使用np.array(data[start:stop]),则可以解决此问题:

And if you just switch to yielding np.array(data[start:stop]) instead, you'll fix the problem:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

当然,这确实为每个块制作了一个额外的内存副本.

Of course, this does make an extra in-memory copy of each chunk.

从长远来看,您可能会发现从映射文件切换到HDF之类的操作会更容易.如果您的数据是多维的,则尤其如此. (我建议使用h5py,但是如果您的数据是表式"的,则pyTables很好.)

In the long run, you'll probably find that it's easier to switch away from memmapped files and move to something like HDF. This especially true if your data is multidimensional. (I'd reccomend h5py, but pyTables is nice if your data is "table-like".)

祝你好运!

这篇关于numpy vs.多处理和mmap的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆