在numpy行上进行并行循环 [英] Parallelize loop over numpy rows

查看:86
本文介绍了在numpy行上进行并行循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将相同的函数应用于numpy数组的每一行,并将结果再次存储在numpy数组中.

I need to apply the same function onto every row in a numpy array and store the result again in a numpy array.

# states will contain results of function applied to a row in array
states = np.empty_like(array)

for i, ar in enumerate(array):
    states[i] = function(ar, *args)

# do some other stuff on states

function 对我的数据进行了一些非平凡过滤,并在条件为True或为False时返回一个数组. function 可以是纯python或cython编译的.对行的过滤操作很复杂,并且可能取决于行中的先前值,这意味着我不能以逐个元素的方式对整个数组进行操作

function does some non trivial filtering of my data and returns an array when the conditions are True and when they are False. function can either be pure python or cython compiled. The filtering operations on the rows are complicated and can depend on previous values in the row, this means I can't operate on the whole array in an element-by-element fashion

例如,有没有办法在黄昏时做类似的事情?

Is there a way to do something like this in dask for example?

推荐答案

Dask解决方案

您可以对dask.array进行处理,方法是逐行对数组进行分块,调用 map_blocks ,然后计算结果

ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()

默认情况下,它将使用线程,您可以通过以下方式使用进程

By default this will use threads, you can use processes in the following way

from dask.multiprocessing import get
states = x.compute(get=get)

游泳池解决方案

但是对于像这样令人尴尬的并行计算而言,dask可能会显得过分杀伤,您可以通过线程池来解决

Pool solution

However dask is probably overkill for embarrassingly parallel computations like this, you could get by with a threadpool

from multiprocessing.pool import ThreadPool
pool = ThreadPool()

ar = ...
states = np.empty_like(array)

def f(i):
    states[i] = function(ar[i], *args)

pool.map(f, range(len(ar)))

您可以通过以下更改切换到流程

And you could switch to processes with the following change

from multiprocessing import Pool
pool = Pool()

这篇关于在numpy行上进行并行循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆