使我的NumPy数组跨进程共享 [英] Making my NumPy array shared across processes

查看:94
本文介绍了使我的NumPy数组跨进程共享的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于共享数组,我已经阅读了很多关于SO的问题,对于简单的数组来说似乎很简单,但是我一直在努力使其适用于我拥有的数组.

I have read quite a few of the questions on SO about sharing arrays and it seems simple enough for simple arrays but I am stuck trying to get it working for the array I have.

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

我尝试通过尝试使mp.Array接受data将其转换为共享数组,也尝试使用ctypes这样创建数组:

I have tried converting this to a shared array by trying to somehow make mp.Array accept the data, I have also tried creating the array as using ctypes as such:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

我设法使代码正常工作的唯一方法不是将数据传递给函数而是传递已编码的字符串以进行未压缩/解码,但是这最终会导致调用n(字符串数)个进程,这似乎是多余的.我想要的实现是基于将二进制字符串列表切成x(进程数),并将此块,dataindex传递给有效的进程,只是在本地修改了data,因此问题是 如何使其共享 ,任何使用自定义(嵌套)numpy数组的示例都将提供很大的帮助.

The only way I have managed to get my code working is not passing data to the function but passing an encoded string to be uncompressed/decoded, this would however end up in n (number of strings) processes being called which seems redundant. My desired implementation is based on slicing the list of binary strings into x (number of processes) and passing this chunk, data and an index to the processes which works except that data is modified locally, hence the question on how to make it shared, any example working with a custom (nested) numpy array would already be a great help.

PS:这个问题是来自 Python多重处理

PS: This question is a follow up from Python multi-processing

推荐答案

请注意,您可以从复杂的dtype数组开始:

Note that you can start out with an array of complex dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

并将其视为同质dtype的数组:

and view it as an array of homogenous dtype:

In [5]: data2 = data.view('float32')

,然后将其转换回复杂的dtype:

and later, convert it back to complex dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

更改dtype是非常快速的操作;它不会影响基础数据,只会影响NumPy解释数据的方式.因此,更改dtype实际上是没有成本的.

Changing the dtype is a very quick operation; it does not affect the underlying data, only the way NumPy interprets it. So changing the dtype is virtually costless.

因此,通过上述技巧,您对具有简单(均质)dtypes数组的了解可以很容易地应用于您的复杂dtypes.

So what you've read about arrays with simple (homogenous) dtypes can be readily applied to your complex dtype with the trick above.

下面的代码借鉴了 J.F.的许多想法.塞巴斯蒂安的答案在这里.

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

如果您可以保证执行分配的各种过程

If you can guarantee that the various processes which execute the assignments

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

从不竞争更改同一位置的数据,那么我相信您实际上可以放弃使用锁

never compete to alter the data in the same locations, then I believe you can actually forgo using the lock

with shared_arr.get_lock():

但是我对您的代码不够了解,无法确定,所以为了安全起见,我提供了锁.

but I don't grok your code well enough to know for sure, so to be on the safe side, I included the lock.

这篇关于使我的NumPy数组跨进程共享的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆