具有大型阵列的Windows上的Python多处理 [英] Python multiprocessing on windows with large arrays

查看:62
本文介绍了具有大型阵列的Windows上的Python多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用python的多处理模块在linux平台上编写了一个脚本.当我尝试在Windows上运行该程序时,无法直接运行,我发现这与Windows上如何生成子进程有关.可以腌制所使用的物品,这一点似乎至关重要.

I wrote a script on a linux platform using the multiprocessing module of python. When I tried running the program on Windows this was not working directly which I found out is related to the fact how child-processes are generated on Windows. It seems to be crucial that the objects which are used can be pickled.

我的主要问题是,我正在使用大型numpy数组.似乎具有一定的大小,它们不再可供选择.要将其分解为一个简单的脚本,我想做这样的事情:

My main problem is, that I am using large numpy arrays. It seems that with a certain size they are not pickable any more. To break it down to a simple script, I want to do something like that:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

我收到以下错误消息:

Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

所以我基本上是在创建一个大数组,在所有过程中都需要用此数组进行计算并返回它.

So I am basically creating a large array which I need in all processes to do calculations with this array and return it.

一个重要的事情似乎是在语句 if __name__ ='__main__'之前编写函数的定义:

One important thing seems to be to write the definitions of the functions before the statement if __name__ = '__main__':

如果我将数组缩小为(50,1024,1280),则整个工作正常.但是,即使启动了4个进程并且工作了4个内核,这也比仅对一个内核(在Windows上)不进行多处理的代码编写要慢.所以我觉得这里还有另一个问题.

The whole thing is working if I reduce the array to (50,1024,1280). However even if 4 processes are started and 4 cores are working, it is slower than writing the code without multiprocessing for one core only (on windows). So I think I have another problem here.

稍后我的实际程序中的功能位于cython模块中.

The function in my real program later on is in a cython module.

我将anaconda软件包与32位python一起使用,因为无法将cython软件包编译为64位版本(我将在另一个线程中对此进行询问).

I am using the anaconda package with python 32-bit since I could not get my cython package compiled with the 64-bit version (I'll ask about that in a different thread).

欢迎任何帮助!

谢谢!菲利普(Philipp)

Thanks! Philipp

更新:

我犯的第一个错误是在 __ main __ 中具有"put_into_queue"函数定义.

First mistake I did was haveing the a "put_into_queue" function definition in the __main__.

然后,我按照建议的方式引入了共享数组,但是它会占用大量内存,并且所使用的内存会随着我使用的进程而扩展(当然,情况并非如此).有什么想法我在这里做错了吗?不过,我认为将共享数组的定义放置在 __ main __ 内或外部似乎并不重要,但我认为它应该位于 __ main __ 中.从这篇文章中获得了这个信息:是否将共享的只读数据复制到不同的进程中以进行Python多处理?

Then I introduced shared arrays as suggested, however, uses a lot of memory and the used memory scales with the processes I use (which should of course not be the case). Any ideas what I am doing wrong here? It seems not to be important where I place the definition of the shared array (in or outside __main__), though, I think it should be in the __main__. Got this from this post: Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

推荐答案

您没有包括完整的回溯.结束是最重要的.在我的32位Python上,我得到了相同的回溯,最后以

You didn't include the full traceback; the end is most important. On my 32-bit Python I get the same traceback that finally ends in

  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError 是一个例外,它表明您内存不足.

MemoryError is the exception and it says you ran out of memory.

64位Python可以解决这个问题,但是在进程之间发送大量数据很容易成为 multiprocessing 的严重瓶颈.

64-bit Python would get around this, but sending large amounts of data between processes can easily become a serious bottleneck in multiprocessing.

这篇关于具有大型阵列的Windows上的Python多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆