numpy 数组索引上的多线程迭代 [英] multithreaded iteration over numpy array indices
问题描述
我有一段代码,它遍历一个三维数组,并根据索引和当前值本身将一个值写入每个单元格:
将 numpy 导入为 npnx = ny = nz = 100数组 = np.zeros((nx, ny, nz))def fun(val, k):# 对索引做一些事情返回 val + (k[0] * k[1] * k[2])使用 np.nditer(array, flags=['multi_index'], op_flags=['readwrite']) 作为它:对于其中的 x:x[...] = fun(x, it.multi_index)
请注意,fun
可能会做一些更复杂的事情,这会占用整个运行时间的大部分时间,并且输入数组的每个轴可能具有不同的长度.
但是,此代码可以在多个线程中运行,因为可以假定 fun
是线程安全的(仅需要当前单元格的值和索引).但是找到一种方法来迭代所有单元格并获得当前索引似乎很难.
一个可能的解决方案可能是
分布式(此代码):
结果
此方法适用于任意大小的 numpy 数组,并且能够对数组的索引执行操作.它为您提供了对整个 numpy 数组的完全访问权限,因此它还可用于执行不同类型的统计分析,而不会更改数组.
从时间上可以看出,对于小数据形状,分布式版本没有或几乎没有优势,因为创建过程的额外复杂性.然而,对于大量数据,它开始变得更有效.
我只在计算时间的短暂延迟上对其计时(简单有趣
),但在更复杂的计算中,它应该会更快地胜过顺序版本.
额外
如果您只对在轴上或沿轴执行的操作感兴趣,这些 numpy 函数可能有助于矢量化您的解决方案,而不是使用多处理:
I have a piece of code which iterates over a three-dimensional array and writes into each cell a value based on the indices and the current value itself:
import numpy as np
nx = ny = nz = 100
array = np.zeros((nx, ny, nz))
def fun(val, k):
# Do something with the indices
return val + (k[0] * k[1] * k[2])
with np.nditer(array, flags=['multi_index'], op_flags=['readwrite']) as it:
for x in it:
x[...] = fun(x, it.multi_index)
Note, that fun
might do something more sophisticated, which takes most of the total runtime, and that the input arrays might have different lengths per axis.
However, this code could run in multiple threads, as fun
can be assumed to be threadsafe (Only the value and index of the current cell are required). But finding a method to iterate over all cells and have the current index available seems to be hard.
A possible solution might be https://stackoverflow.com/a/58012407/446140, where the array is split by the x-axis into chunks and passed to a Pool
.
However, the solution is not universally applicable and I wonder if there is a more general solution for this problem (which could also work with nD arrays)?
The first issue is to split up the 3D array into equally sized chunks. np.array_split can be used, but the offset of each of the splits has to be stored to get the correct indices again.
An interesting question, with a few possible solutions. As you indicated, it is possible to use np.array_split
, but since we are only interested in the indices, we can also use np.unravel_index, which would mean that we only have to loop over all the indices (the size) of the array to get the index.
Now there are two great ideas for multiprocessing:
- Create a (thread safe) shared memory of the array and splitting the indices across the different processes.
- Only update the array in a main thread, but provide a copy of the required data to the processes and let them return the value that has to be updated.
Both solutions will work for any np.ndarray
, but have different advantages. Creating a shared memory doesn't create copies, but can have a large insertion penalty if it has to wait on other processes (the computational time, is small compared to the write time.)
There are probably many more solutions, but I will work out the first solution, where a Shared Memory object is created and a range of indices is provided to every process.
Required imports:
import itertools
import numpy as np
import multiprocessing as mp
from multiprocessing import shared_memory
Shared Numpy arrays
The main problem with applying multiprocessing on np.ndarray
's is that memory sharing between processes can be difficult. For this the following class can be used:
class SharedNumpy:
__slots__ = ('arr', 'shm', 'name', 'shared',)
def __init__(self, arr: np.ndarray = None):
if arr is not None:
self.shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
self.arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=self.shm.buf)
self.name = self.shm.name
np.copyto(self.arr, arr)
def __getattr__(self, item):
if hasattr(self.arr, item):
return getattr(self.arr, item)
raise AttributeError(f"{self.__class__.__name__}, doesn't have attribute {item!r}")
def __str__(self):
return str(self.arr)
@classmethod
def from_name(cls, name, shape, dtype):
memory = cls(arr=None)
memory.shm = shared_memory.SharedMemory(name)
memory.arr = np.ndarray(shape, dtype=dtype, buffer=memory.shm.buf)
memory.name = name
return memory
@property
def dtype(self):
return self.arr.dtype
@property
def shape(self):
return self.arr.shape
This makes it possible to create a shared memory
object in the main process and then use SharedNumpy.from_name
to get it in other processes.
Simple test
A quick (non threaded) test would be:
def simple_test():
data = np.array(np.zeros((5,) * 2))
mem_primary = SharedNumpy(arr=data)
mem_second = SharedNumpy.from_name(name=mem_primary.name, shape=data.shape, dtype=data.dtype)
assert mem_primary.name == mem_second.name, "Different memory names"
assert np.array_equal(mem_primary.arr, mem_second.arr), "Different array values."
mem_primary.arr[2] = 5
assert np.array_equal(mem_primary.arr, mem_second.arr), "Different array values."
print("Completed 3/3 tests...")
A threaded test will follow later!
Distribution
The next part is focused on providing the processes with the necessary data. In this case we will provide every process with a range of indices that it has to calculate and all the data that is required to load the shared memory.
The input of this function is a dim
the number of numpy axis, and the size
, which are the number of elements per axis.
def distributed(size, dim):
memory = SharedNumpy(arr=np.zeros((size,) * dim))
split_size = np.int64(np.ceil(memory.arr.size / mp.cpu_count()))
settings = dict(
memory=itertools.repeat(memory.name),
shape=itertools.repeat(memory.arr.shape),
dtype=itertools.repeat(memory.arr.dtype),
start=np.arange(mp.cpu_count()),
num=itertools.repeat(split_size)
)
with mp.Pool(mp.cpu_count()) as pool:
pool.starmap(fun, zip(*settings.values()))
print(f"\n\nDone {dim}D, size: {size}, elements: {size ** dim}")
return memory
Notes:
- By using
starmap
instead ofmap
, it is possible to provide multiple input arguments (a list of arguments for every process).- (also see docs starmap)
itertools.repeat
is used to add constants to thestarmap
- (also see: zip() in python, how to use static values)
- By using
np.unravel_index
, we only need to have a start index and the chunk size per process. - The
start
andnum
tell the chunks of indices that have to be converted per process, by applyingrange(start * num, (start + 1) * num)
.
Testing
For the testing I am using different input sizes and dimensions. Since the data increases with the formula sizes ^ dimensions
, I limited the test to a size of 128 and 3 dimensions (that is 2,097,152 points, and already start taking quit a bit of time.)
Code
fun
def fun(name, shape, dtype, start, num): memory = SharedNumpy.from_name(name, shape=shape, dtype=dtype) for idx in range(start * num, min((start + 1) * num, memory.arr.size)): # Do something with the indices indices = np.unravel_index([idx], shape) memory.arr[indices] += np.product(indices) memory.shm.close() # Closes the shared memory for this process.
Running the example
if __name__ == '__main__': for size in [5, 10, 15]: for dim in [1, 2, 3]: memory = distributed(size, dim) print(memory) memory.shm.unlink()
For the OP's code, I used his code with a small addition that I allow the array to have different sizes and dimensions, in any case I use:
def sequential(size, dim):
array = np.zeros((size,) * dim)
...
And looking at the output array of both codes, will result in the same outcomes.
Plots
The code for the graphs have been taken from the reply in:
With the minor alteration that labels
was changed to codes
in
empty_multi_index = pd.MultiIndex(levels=[[], []], codes=[[], []], names=['func', 'result'])
Where the 1d
, 2d
and 3d
reference the dimensions and the input is the size
.
Sequentially (OP code):
Distributed (this code):
Results
This method works on an arbitrary sized numpy array, and is able to perform an operation on the indices of the array. It provides you with full access of the whole numpy array, so it can also be used to perform different kind of statistical analysis, which do not change the array.
From the timings it can be seen that for small data shapes the distributed version has no to little advantages, because of the extra complexity of creating the processes. However for larger amount of data it starts to become more effective.
I only timed it on short delays in the computational time (simple fun
), but on more complex calculations, it should outperform the sequential version much sooner.
Extra
If you are only interested in operations that are performed over or along axis, these numpy functions might help to vectorize your solutions instead of using multiprocessing:
这篇关于numpy 数组索引上的多线程迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!