在python多处理池中共享numpy数组 [英] Sharing numpy arrays in python multiprocessing pool

查看:514
本文介绍了在python多处理池中共享numpy数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一些代码,该代码可以处理大量(数以十万计的数字积分)问题.幸运的是,这些集成令人难以置信地是并行的,因此使用Pool.map()可以轻松地将工作分散到多个内核中.

I'm working on some code that does some fairly heavy numerical work on a large (tens to hundreds of thousands of numerical integrations) set of problems. Fortunately, these integrations are embarassingly parallel, so it's easy to use Pool.map() to split up the work across multiple cores.

现在,我有一个具有以下基本工作流程的程序:

Right now, I have a program that has this basic workflow:

#!/usr/bin/env python
from multiprocessing import Pool
from scipy import *
from my_parser import parse_numpy_array
from my_project import heavy_computation

#X is a global multidimensional numpy array
X = parse_numpy_array("input.dat")
param_1 = 0.0168
param_2 = 1.505

def do_work(arg):
  return heavy_computation(X, param_1, param_2, arg)

if __name__=='__main__':
  pool = Pool()
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

由于X,param_1和param_2是针对池中的每个进程进行完全相同的硬编码和初始化的,因此一切正常.现在,我的代码已经可以工作了,我想这样做,以便用户在运行时输入文件名param_1和param_2,而不是对其进行硬编码.

Since X, param_1, and param_2 are hard-coded and initialized in exactly the same way for each process in the pool, this all works fine. Now that I have my code working, I'd like to make it so that the file name, param_1, and param_2 are input by the user at run-time, rather than being hard-coded.

应注意的一件事是,X,param_1和param_2在完成工作时不会被修改.由于我不修改它们,因此可以在程序开始时执行以下操作:

One thing that should be noted is that X, param_1, and param_2 are not modified as the work is being done. Since I don't modify them, I could do something like this at the beginning of the program:

import sys
X = parse_numpy_array(sys.argv[1])
param_1 = float(sys.argv[2])
param_2 = float(sys.argv[3])

这可以解决问题,但是由于该代码的大多数用户都在Windows计算机上运行该代码,因此我不想采用命令行参数的方法.

And that would do the trick, but since most users of this code are running the code from Windows machines, I'd rather not go the route of command-line arguments.

我真正想做的是这样的:

What I would really like to do is something like this:

X, param_1, param_2 = None, None, None

def init(x,p1, p2)
  X = x
  param_1 = p1
  param_2 = p2

if __name__=='__main__':
  filename = raw_input("Filename> ")
  param_1 = float(raw_input("Parameter 1: "))
  param_2 = float(raw_input("Parameter 2: "))
  X = parse_numpy_array(filename)
  pool = Pool(initializer = init, initargs = (X, param_1, param_2,))
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

但是,当然,这会失败,并且在发生pool.map调用时,X/param_1/param_2都为None.我对多重处理还很陌生,所以我不确定为什么对初始化程序的调用会失败.有什么方法可以做我想做的事吗?是否有更好的方法可以完全解决此问题?我还研究了使用共享数据,但是根据我对文档的理解,该方法仅适用于不包含numpy数组的ctypes.任何帮助,将不胜感激.

But, of course, this fails and X/param_1/param_2 are all None when the pool.map call happens. I'm pretty new to multiprocessing, so I'm not sure why the call to the initializer fails. Is there a way to do what I want to do? Is there a better way to go about this altogether? I've also looked at using shared data, but from my understanding of the documentation, that only works on ctypes, which don't include numpy arrays. Any help with this would be greatly appreciated.

推荐答案

我遇到了类似的问题.如果您只想阅读我的解决方案,请跳过以下几行:) 我必须:

I had a similar problem. If you just want to read my solution skip some lines :) I had to:

  • 在运行于其不同部分的线程之间共享一个numpy.array,并...
  • 传递Pool.map一个带有多个参数的函数.

我注意到:

  • 已正确读取numpy.array的数据,但...
  • 对numpy.array的更改未永久更改
  • Pool.map在处理lambda函数时遇到问题,或者在我看来(如果您不清楚这一点,请忽略它)

我的解决方法是:

  • 使目标函数仅作为列表参数
  • 使目标函数返回修改后的数据,而不是直接尝试在numpy.array上写
  • make the target function only argument a list
  • make the target function return the modified data instead of directly trying to write on the numpy.array

我知道您的do_work函数已经返回了计算数据,因此您只需要修改to_work以接受一个列表(包含X,param_1,param_2和arg)作为参数并将输入打包到目标函数中即可.格式,然后再将其传递给Pool.map.

I understand that your do_work function already return the computed data, so you would just have to modify to_work to accept a list (containing X,param_1,param_2 and arg) as argument and to pack the input to the target function in this format before passing it to Pool.map.

这是一个示例实现:

def do_work2(args):
    X,param_1,param_2,arg = args
    return heavy_computation(X, param_1, param_2, arg)

现在,您必须在调用之前将输入打包到do_work函数中.您的主要对象变为:

Now you have to pack the input to the do_work function before calling it. Your main become:

if __name__=='__main__':
   filename = raw_input("Filename> ")
   param_1 = float(raw_input("Parameter 1: "))
   param_2 = float(raw_input("Parameter 2: "))
   X = parse_numpy_array(filename)
   # now you pack the input arguments
   arglist = [[X,param1,param2,n] for n in linspace(0.0,1.0,100)]
   # consider that you're not making 100 copies of X here. You're just passing a reference to it
   results = Pool.map(do_work2,arglist)
   #save results in a .npy file for analysis
   save("Results", [X,results])

这篇关于在python多处理池中共享numpy数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆