在python多处理中传递共享内存变量 [英] Passing shared memory variables in python multiprocessing

查看:82
本文介绍了在python多处理中传递共享内存变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Python的多处理程序并行读取一堆文件,并将所有数据收集在一个NumPy数组中.为此,我想定义一个共享内存NumPy数组,并将其切片传递给不同的进程以并行读取.以下代码给出了我想做的事的玩具插图,其中我尝试使用多处理来修改numpy数组.

I have a bunch of files that I want to read in parallel using Python's multiprocessing and collect all the data in a single NumPy array. For this purpose, I want to define a shared memory NumPy array and pass its slices to different processes to read in parallel. A toy illustration of what I am trying to do is given in the following code where I am trying to modify a numpy array using multiprocessing.

示例1:


import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

在此代码中,我希望将arr填充为0、1、2、3.但这会将arr打印为全零.在在这里阅读答案之后,我使用multiprocessing.Array定义了共享内存变量,并如下修改了我的代码

In this code, I want the arr to be filled with 0, 1, 2, 3. This however prints arr to be all zeros. After reading the answers here, I used multiprocessing.Array to define the shared memory variable and modified my code as follows

示例2:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

这还将打印arr的所有零.但是,当我在main外部定义数组并使用pool.map时,代码将起作用.例如,以下代码有效

This also prints all zeros for arr. However, when I define the array outside main and use pool.map, the code works. For e.g., the following code works

示例3:

import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

这将打印[0,1,2,3].

This prints [0,1,2,3].

我对这一切感到非常困惑.我的问题是:

I am very confused by all this. My questions are:

  1. 当我定义arr = np.zeros(4)时,哪个处理器拥有此变量?然后,当我将该数组的分片发送到不同的处理器时,如果未在这些处理器上定义此变量,则会发送该消息.

  1. When I define arr = np.zeros(4), which processor owns this variable? When I then send the slice of this array to different processors what is being sent if this variable is not defined on those processors.

为什么示例2不能正常工作而示例3可以正常工作?

Why doesn't example 2 work while example 3 does?

我正在Linux和Python/3.7/4上工作

I am working on Linux and Python/3.7/4

推荐答案

当我定义arr = np.zeros(4)时,哪个处理器拥有此变量?

When I define arr = np.zeros(4), which processor owns this variable?

只有主进程才能访问此文件.如果您使用"fork",对于start方法,子进程将可以访问所有内容,但是一旦尝试对其进行修改,则将其复制到其自己的私有内存空间,然后再进行修改(写时复制).如果您有大型只读数组,这会减少开销,但对于将数据写回这些数组没有太大帮助.

Only the main process should have access to this. If you use "fork" for the start method, everything will be accessible to the child process, but as soon as something tries to modify it, it will be copied to it's own private memory space before being modified (copy on write). This reduces overhead if you have large read-only arrays, but doesn't help you much for writing data back to those arrays.

如果未在这些处理器上定义此变量,将发送什么信息.

what is being sent if this variable is not defined on those processors.

在通过管道和 pickle 从主进程中发送完参数后,在子进程中重建参数时会在子进程中创建一个新的数组.数据被序列化为文本并重新构造,因此除了切片中的数据值外,没有其他信息.这是一个全新的对象.

A new array is created within the child process when the arguments are re-constructed after being sent from the main process via a pipe and pickle. The data is serialized to text and re-constructed, so no information other than the value of the data in the slice remains. it's a totally new object.

为什么示例2不能正常工作而示例3可以正常工作?

Why doesn't example 2 work while example 3 does?

示例3之所以有效,是因为在分叉"时(当您调用 Pool 的那一刻), arr 已经创建,并且将被共享.使用 Array 创建数据也很重要,因此,当您尝试修改数据时,数据是共享的(确切的机制很复杂).

example 3 works because at the time of "fork" (the moment you call Pool), arr has already been created, and will be shared. It's also important that you used an Array to create it, so when you attempt to modify the data, the data is shared (the exact mechanics of this are complicated).

示例2的工作方式与示例1的工作方式不同:您将数组的一个切片作为参数传递,该数组的一部分被转换为一个全新的对象,因此您内部的 arr do_stuff 函数只是主过程中 arr [i:i + 1] 的副本.在调用 Pool 之前,创建任何将在进程之间共享的东西仍然很重要(如果您依靠的是"fork"来共享数据),但这不是这个示例不起作用的原因

example 2 does not work in a similar way that example 1 does not work: you pass a slice of an array as an argument, which gets converted into a totally new object, so arr inside your do_stuff function is just a copy of arr[i:i+1] from the main process. It is still important to create anything which will be shared between processes before calling Pool (if you're relying on "fork" to share the data), but that's not why this example doesn't work.

您应该知道:示例3仅适用于您在Linux上,并且默认的启动方法是 fork .这不是首选的启动方法,因为在锁定状态下复制锁定对象时可能会发生死锁.这一点根本无法在Windows上运行,默认情况下,在3.8及更高版本上,它也将无法在MacOS上运行.

You should know: example 3 only works because you're on linux, and the default start method is fork. This is not the preferred start method due to the possibility of deadlocks with copying lock objects in a locked state. This will not work on Windows at all, and won't work on MacOS by default on 3.8 and above.

所有这些的最佳解决方案(最可移植的)是将 Array 本身作为参数传递,并在子进程内部重新构造numpy数组.这具有使共享对象"共享的复杂性.只能在创建子进程时作为参数传递.如果使用 Process ,这没什么大不了的,但是使用 Pool ,基本上,您必须将任何共享库作为参数传递给初始化函数,然后获取-构造的数组作为子级作用域的全局变量.例如,在此示例中,尝试将 buf 作为参数传递给 p.map p.apply 时会出错,但在将 buf 作为 initargs =(buf,)传递给 Pool()

The best solution (most portable) to all this is to pass the Array itself as the argument, and re-construct the numpy array inside the child process. This has the complication that "shared objects" can only be passed as arguments at the creation of the child process. This isn't as big a deal if you use Process, but with Pool, you basically have to pass any shared objects as arguments to an initialization function, and get the re-constructed array as a global variable of the child's scope. In this example for instance you will get an error trying to pass buf as an argument with p.map or p.apply, but not when passing buf as initargs=(buf,) to Pool()

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
    global arr #use global context (for each process) to pass arr to do_stuff
    arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
    global arr
    arr[i]=i

if __name__ == '__main__':
    idx = [0,1,2,3]
    
    buf = Array('d', 4)
    arr = np.frombuffer(buf.get_obj(), dtype='d')
    arr[:] = 0
    
    #"with" context is easier than writing "close" and "join" all the time
    with Pool(4, initializer=init_child, initargs=(buf,)) as p:
        for i in idx:
            p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
    print(arr)

3.8及更高版本中有一个新模块,它比 Array 或称为 shared_memory 的任何其他 sharedctypes 类要好.这使用起来有点复杂,并且还具有一些其他依赖于操作系统的特性,但是从理论上讲它的开销较低,而且速度更快.如果您想钻进兔子洞,我已经写了很少 <关于答案.python.org/3/library/multiprocessing.shared_memory.html"rel =" nofollow noreferrer> shared_memory ,并且最近,如果您愿意,一般都在回答很多关于并发性的问题对上一两个月来我的回答answers之以鼻.

with 3.8 and above there's a new module which is better than Array or any of the other sharedctypes classes called: shared_memory. This is a bit more complicated to use, and has some additional OS dependent nastiness, but it's theoretically lower overhead and faster. If you want to go down the rabbit hole I've written a few answers on the topic of shared_memory, and have recently been answering lots of questions on concurrency in general if you want to take a gander at my answers from the last month or two.

这篇关于在python多处理中传递共享内存变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆