numpy 数组索引上的多线程迭代 [英] multithreaded iteration over numpy array indices

查看:54
本文介绍了numpy 数组索引上的多线程迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一段代码,它遍历一个三维数组,并根据索引和当前值本身将一个值写入每个单元格:

将 numpy 导入为 npnx = ny = nz = 100数组 = np.zeros((nx, ny, nz))def fun(val, k):# 对索引做一些事情返回 val + (k[0] * k[1] * k[2])使用 np.nditer(array, flags=['multi_index'], op_flags=['readwrite']) 作为它:对于其中的 x:x[...] = fun(x, it.multi_index)

请注意,fun 可能会做一些更复杂的事情,这会占用整个运行时间的大部分时间,并且输入数组的每个轴可能具有不同的长度.

但是,此代码可以在多个线程中运行,因为可以假定 fun 是线程安全的(仅需要当前单元格的值和索引).但是找到一种方法来迭代所有单元格并获得当前索引似乎很难.

一个可能的解决方案可能是

  • 分布式(此代码):

  • 结果

    此方法适用于任意大小的 numpy 数组,并且能够对数组的索引执行操作.它为您提供了对整个 numpy 数组的完全访问权限,因此它还可用于执行不同类型的统计分析,而不会更改数组.

    从时间上可以看出,对于小数据形状,分布式版本没有或几乎没有优势,因为创建过程的额外复杂性.然而,对于大量数据,它开始变得更有效.

    我只在计算时间的短暂延迟上对其计时(简单有趣),但在更复杂的计算中,它应该会更快地胜过顺序版本.

    额外

    如果您只对在轴上或沿轴执行的操作感兴趣,这些 numpy 函数可能有助于矢量化您的解决方案,而不是使用多处理:

    I have a piece of code which iterates over a three-dimensional array and writes into each cell a value based on the indices and the current value itself:

    import numpy as np
    nx = ny = nz = 100
    
    array = np.zeros((nx, ny, nz))
    
    def fun(val, k):
        # Do something with the indices
        return val + (k[0] * k[1] * k[2])
    
    with np.nditer(array, flags=['multi_index'], op_flags=['readwrite']) as it:
        for x in it:
            x[...] = fun(x, it.multi_index)
    
    

    Note, that fun might do something more sophisticated, which takes most of the total runtime, and that the input arrays might have different lengths per axis.

    However, this code could run in multiple threads, as fun can be assumed to be threadsafe (Only the value and index of the current cell are required). But finding a method to iterate over all cells and have the current index available seems to be hard.

    A possible solution might be https://stackoverflow.com/a/58012407/446140, where the array is split by the x-axis into chunks and passed to a Pool. However, the solution is not universally applicable and I wonder if there is a more general solution for this problem (which could also work with nD arrays)? The first issue is to split up the 3D array into equally sized chunks. np.array_split can be used, but the offset of each of the splits has to be stored to get the correct indices again.

    解决方案

    An interesting question, with a few possible solutions. As you indicated, it is possible to use np.array_split, but since we are only interested in the indices, we can also use np.unravel_index, which would mean that we only have to loop over all the indices (the size) of the array to get the index.

    Now there are two great ideas for multiprocessing:

    1. Create a (thread safe) shared memory of the array and splitting the indices across the different processes.
    2. Only update the array in a main thread, but provide a copy of the required data to the processes and let them return the value that has to be updated.

    Both solutions will work for any np.ndarray, but have different advantages. Creating a shared memory doesn't create copies, but can have a large insertion penalty if it has to wait on other processes (the computational time, is small compared to the write time.)

    There are probably many more solutions, but I will work out the first solution, where a Shared Memory object is created and a range of indices is provided to every process.

    Required imports:

    import itertools
    import numpy as np
    import multiprocessing as mp
    from multiprocessing import shared_memory
    

    Shared Numpy arrays

    The main problem with applying multiprocessing on np.ndarray's is that memory sharing between processes can be difficult. For this the following class can be used:

    class SharedNumpy:
        __slots__ = ('arr', 'shm', 'name', 'shared',)
    
        def __init__(self, arr: np.ndarray = None):
            if arr is not None:
                self.shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
                self.arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=self.shm.buf)
                self.name = self.shm.name
                np.copyto(self.arr, arr)
    
        def __getattr__(self, item):
            if hasattr(self.arr, item):
                return getattr(self.arr, item)
            raise AttributeError(f"{self.__class__.__name__}, doesn't have attribute {item!r}")
    
        def __str__(self):
            return str(self.arr)
    
        @classmethod
        def from_name(cls, name, shape, dtype):
            memory = cls(arr=None)
            memory.shm = shared_memory.SharedMemory(name)
            memory.arr = np.ndarray(shape, dtype=dtype, buffer=memory.shm.buf)
            memory.name = name
            return memory
    
        @property
        def dtype(self):
            return self.arr.dtype
    
        @property
        def shape(self):
            return self.arr.shape
    

    This makes it possible to create a shared memory object in the main process and then use SharedNumpy.from_name to get it in other processes.

    Simple test

    A quick (non threaded) test would be:

    def simple_test():
        data = np.array(np.zeros((5,) * 2))
    
        mem_primary = SharedNumpy(arr=data)
        mem_second = SharedNumpy.from_name(name=mem_primary.name, shape=data.shape, dtype=data.dtype)
    
        assert mem_primary.name == mem_second.name, "Different memory names"
        assert np.array_equal(mem_primary.arr, mem_second.arr), "Different array values."
    
        mem_primary.arr[2] = 5
        assert np.array_equal(mem_primary.arr, mem_second.arr), "Different array values."
        print("Completed 3/3 tests...")
    

    A threaded test will follow later!

    Distribution

    The next part is focused on providing the processes with the necessary data. In this case we will provide every process with a range of indices that it has to calculate and all the data that is required to load the shared memory.

    The input of this function is a dim the number of numpy axis, and the size, which are the number of elements per axis.

    def distributed(size, dim):
        memory = SharedNumpy(arr=np.zeros((size,) * dim))
        split_size = np.int64(np.ceil(memory.arr.size / mp.cpu_count()))
    
        settings = dict(
                memory=itertools.repeat(memory.name),
                shape=itertools.repeat(memory.arr.shape),
                dtype=itertools.repeat(memory.arr.dtype),
                start=np.arange(mp.cpu_count()),
                num=itertools.repeat(split_size)
        )
    
        with mp.Pool(mp.cpu_count()) as pool:
            pool.starmap(fun, zip(*settings.values()))
    
        print(f"\n\nDone {dim}D, size: {size}, elements: {size ** dim}")
        return memory
    

    Notes:

    • By using starmap instead of map, it is possible to provide multiple input arguments (a list of arguments for every process).
    • itertools.repeat is used to add constants to the starmap
    • By using np.unravel_index, we only need to have a start index and the chunk size per process.
    • The start and num tell the chunks of indices that have to be converted per process, by applying range(start * num, (start + 1) * num).

    Testing

    For the testing I am using different input sizes and dimensions. Since the data increases with the formula sizes ^ dimensions, I limited the test to a size of 128 and 3 dimensions (that is 2,097,152 points, and already start taking quit a bit of time.)

    Code

    • fun

      def fun(name, shape, dtype, start, num):
          memory = SharedNumpy.from_name(name, shape=shape, dtype=dtype)
          for idx in range(start * num, min((start + 1) * num, memory.arr.size)):
              # Do something with the indices
              indices = np.unravel_index([idx], shape)
              memory.arr[indices] += np.product(indices)
          memory.shm.close() # Closes the shared memory for this process.
      

    • Running the example

      if __name__ == '__main__':
          for size in [5, 10, 15]:
              for dim in [1, 2, 3]:
                  memory = distributed(size, dim)
                  print(memory)
                  memory.shm.unlink()
      

    For the OP's code, I used his code with a small addition that I allow the array to have different sizes and dimensions, in any case I use:

    def sequential(size, dim):
        array = np.zeros((size,) * dim)
        ...
    

    And looking at the output array of both codes, will result in the same outcomes.

    Plots

    The code for the graphs have been taken from the reply in:

    With the minor alteration that labels was changed to codes in

    empty_multi_index = pd.MultiIndex(levels=[[], []], codes=[[], []], names=['func', 'result'])
    

    Where the 1d, 2d and 3d reference the dimensions and the input is the size.

    • Sequentially (OP code):

    • Distributed (this code):

    Results

    This method works on an arbitrary sized numpy array, and is able to perform an operation on the indices of the array. It provides you with full access of the whole numpy array, so it can also be used to perform different kind of statistical analysis, which do not change the array.

    From the timings it can be seen that for small data shapes the distributed version has no to little advantages, because of the extra complexity of creating the processes. However for larger amount of data it starts to become more effective.

    I only timed it on short delays in the computational time (simple fun), but on more complex calculations, it should outperform the sequential version much sooner.

    Extra

    If you are only interested in operations that are performed over or along axis, these numpy functions might help to vectorize your solutions instead of using multiprocessing:

    这篇关于numpy 数组索引上的多线程迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆