Python多处理 [英] Python multi-processing

查看:95
本文介绍了Python多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个很大的列表,其中包含我以前在单个函数中处理过的二进制编码的字符串,如下所示:

I have a large list containing binary encoded strings that I used to process in a single function before, like so:

""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')

def func numpy_array(data, peaks):
rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

我一直在阅读多处理程序,并发现我想尝试一下是否可以大幅提高性能,因此我将函数重写为2(helper和'caller'),如下所示:

I have been reading up on multiprocessing and figured that I wanted to try that to see if I could get a big increase in performance, I rewrote my function into 2 (helper and 'caller') like so:

def numpy_array(data, peaks):
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
    pool = mp.Pool(processes=processors)
    chunk_size=len(peaks)/processors
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        pool.map(decode(data,chunk,counter))

def decode(data,chunk,counter):
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1      

程序可以运行,但只使用100-110%的CPU(根据顶部),一旦完成,它就会向我抛出TypeError: map() takes at least 3 arguments (2 given),任何对多进程有更多经验的人都可以向我暗示一下寻找(可能导致TypeError)?是什么原因导致我的CPU使用率低?

The program runs but only uses 100-110% of CPU (according to top) and once it should be finished it throws TypeError: map() takes at least 3 arguments (2 given) at me, could anyone with some more experience with multiprocess give me a hint as to what things to look out for (that could cause the TypeError)? What might be causing my low cpu usage?

-合并答案后的代码-

def decode((data,chunk,counter)):
    print len(chunk), counter
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

def numpy_array(data, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors=mp.cpu_count()
    pool = mp.Pool(processes=processors)
    chunk_size=int(len(peaks)/processors)
    map_parameters=[]
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter))
    pool.map(decode,map_parameters) 

到目前为止,此最新版本有效",可以将其填充到进程中的数组中(该数组中包含值),但是一旦完成所有进程的访问,该数组将产生零值,这仅是因为每个进程都获得了该数组的本地副本.

This latest version 'works' so far that it fills the array in the processes (where the array contains values) but once all processes are done accessing the array yields zero values only because each process gets a local copy of the array.

推荐答案

类似的方法应该起作用

请注意,pool.map会为每个调用获取一个函数以及该函数的参数列表.在原始示例中,您只是在numpy_array函数中调用它.

Note that pool.map takes a function and a list of parameters for that function for each call. In your original example you are just calling it in the numpy_array function.

该函数只能有一个参数,因此将参数打包成一个元组,并且在decode中看起来很奇怪的双括号(称为元组拆包).

The function must only have one argument, hence the packing of the arguments into a tuple and the rather odd looking double brackets in decode (which is called tuple unpacking).

def numpy_array(data, peaks):
    processors=4
    pool = mp.Pool(processes=processors)
    chunk_size=len(data)/processors
    print range(processors)
    map_parameters = [] # new
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter)) # new
    pool.map(decode, map_parameters) # new

def decode((data,chunk,counter)): # changed
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1

这篇关于Python多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆