如何并行化python numpy中的总和计算? [英] How to parallelize a sum calculation in python numpy?

查看:294
本文介绍了如何并行化python numpy中的总和计算?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要计算的总和,并且我很难并行化代码.我尝试并行化的计算有点复杂(它同时使用了numpy数组和scipy稀疏矩阵).它吐出一个numpy数组,我想对大约1000个计算中的输出数组求和.理想情况下,我将在所有迭代中保持运行总和.但是,我还无法弄清楚该怎么做.

I have a sum that I'm trying to compute, and I'm having difficulty parallelizing the code. The calculation I'm trying to parallelize is kind of complex (it uses both numpy arrays and scipy sparse matrices). It spits out a numpy array, and I want to sum the output arrays from about 1000 calculations. Ideally, I would keep a running sum over all the iterations. However, I haven't been able to figure out how to do this.

到目前为止,我已经尝试将joblib的Parallel函数和pool.map函数与python的多处理程序包一起使用.对于这两种情况,我都使用一个内部函数返回一个numpy数组.这些函数返回一个列表,我将其转换为numpy数组,然后求和.

So far, I've tried using joblib's Parallel function and the pool.map function with python's multiprocessing package. For both of these, I use an inner function that returns a numpy array. These functions return a list, which I convert to a numpy array and then sum over.

但是,在joblib并行函数完成所有迭代之后,主程序将永远不会继续运行(看起来原始作业处于挂起状态,使用0%CPU).当我使用pool.map时,所有迭代完成后都会出现内存错误.

However, after the joblib Parallel function completes all iterations, the main program never continues running (it looks like the original job is in a suspended state, using 0% CPU). When I use pool.map, I get memory errors after all the iterations are complete.

有没有一种方法可以简单地并行化运行中的数组总和?

Is there a way to simply parallelize a running sum of arrays?

编辑:目标是执行以下操作(并行执行).

Edit: The goal is to do something like the following, except in parallel.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr

推荐答案

我想出了如何使用多处理,apply_async和回调将数组的总和并行化的方法,因此我在这里将其发布给其他人.我将并行Python的示例页面用于Sum回调类,尽管我实际上并没有使用该程序包来执行.不过,它给了我使用回调的想法.这是我最终使用的简化代码,它可以完成我想要的操作.

I figured out how to do parallelize a sum of arrays with multiprocessing, apply_async, and callbacks, so I'm posting this here for other people. I used the example page for Parallel Python for the Sum callback class, although I did not actually use that package for implementation. It gave me the idea of using callbacks, though. Here's the simplified code for what I ended up using, and it does what I wanted it to do.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

我还能够使用并行映射来完成此工作,这在另一个答案中建议.我已经尝试过了,但是我没有正确实现它.两种方法均有效,我认为此答案很好地说明了使用哪种方法(映射或apply.async)的问题.对于地图版本,您无需定义Sum类,并且summers函数变为

I was also able to get this working using a parallelized map, which was suggested in another answer. I had tried this earlier, but I wasn't implementing it correctly. Both ways work, and I think this answer explains the issue of which method to use (map or apply.async) pretty well. For the map version, you don't need to define the class Sum and the summers function becomes

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr

这篇关于如何并行化python numpy中的总和计算?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆