在numpy行上进行并行循环 [英] Parallelize loop over numpy rows
问题描述
我需要将相同的函数应用于numpy数组的每一行,并将结果再次存储在numpy数组中.
I need to apply the same function onto every row in a numpy array and store the result again in a numpy array.
# states will contain results of function applied to a row in array
states = np.empty_like(array)
for i, ar in enumerate(array):
states[i] = function(ar, *args)
# do some other stuff on states
function
对我的数据进行了一些非平凡过滤,并在条件为True或为False时返回一个数组. function
可以是纯python或cython编译的.对行的过滤操作很复杂,并且可能取决于行中的先前值,这意味着我不能以逐个元素的方式对整个数组进行操作
function
does some non trivial filtering of my data and returns an array when the conditions are True and when they are False. function
can either be pure python or cython compiled. The filtering operations on the rows are complicated and can depend on previous values in the row, this means I can't operate on the whole array in an element-by-element fashion
例如,有没有办法在黄昏时做类似的事情?
Is there a way to do something like this in dask for example?
推荐答案
Dask解决方案
您可以对dask.array进行处理,方法是逐行对数组进行分块,调用 map_blocks
,然后计算结果
ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()
默认情况下,它将使用线程,您可以通过以下方式使用进程
By default this will use threads, you can use processes in the following way
from dask.multiprocessing import get
states = x.compute(get=get)
游泳池解决方案
但是对于像这样令人尴尬的并行计算而言,dask可能会显得过分杀伤,您可以通过线程池来解决
Pool solution
However dask is probably overkill for embarrassingly parallel computations like this, you could get by with a threadpool
from multiprocessing.pool import ThreadPool
pool = ThreadPool()
ar = ...
states = np.empty_like(array)
def f(i):
states[i] = function(ar[i], *args)
pool.map(f, range(len(ar)))
您可以通过以下更改切换到流程
And you could switch to processes with the following change
from multiprocessing import Pool
pool = Pool()
这篇关于在numpy行上进行并行循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!