在共享内存中使用 numpy 数组进行多处理 [英] Use numpy array in shared memory for multiprocessing

查看:23
本文介绍了在共享内存中使用 numpy 数组进行多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在共享内存中使用一个 numpy 数组来与多处理模块一起使用.困难在于像 numpy 数组一样使用它,而不仅仅是作为 ctypes 数组.

I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

这会产生如下输出:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

可以通过 ctypes 方式访问数组,例如arr[i] 是有道理的.但是,它不是一个 numpy 数组,我无法执行诸如 -1*arrarr.sum() 之类的操作.我想一个解决方案是将 ctypes 数组转换为一个 numpy 数组.但是(除了无法完成这项工作),我不相信它会再被共享.

The array can be accessed in a ctypes manner, e.g. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr, or arr.sum(). I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don't believe it would be shared anymore.

对于必须是常见问题的问题,似乎会​​有一个标准的解决方案.

It seems there would be a standard solution to what has to be a common problem.

推荐答案

添加到@unutbu(不再可用)和@Henry Gomersall 的答案.您可以在需要时使用 shared_arr.get_lock() 来同步访问:

To add to @unutbu's (not available anymore) and @Henry Gomersall's answers. You could use shared_arr.get_lock() to synchronize access when needed:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

示例

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

如果您不需要同步访问或者您创建自己的锁,那么 mp.Array() 是不必要的.在这种情况下,您可以使用 mp.sharedctypes.RawArray.

If you don't need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.

这篇关于在共享内存中使用 numpy 数组进行多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆