使用python多处理动态创建共享数组的列表 [英] Dynamically create a list of shared arrays using python multiprocessing

查看:73
本文介绍了使用python多处理动态创建共享数组的列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用python的多处理模块在不同的子进程之间共享多个numpy数组.我希望数组可以分别锁定,并且希望在运行时动态确定数组的数量.这可能吗?

I'd like to share several numpy arrays between different child processes with python's multiprocessing module. I'd like the arrays to be separately lockable, and I'd like the number of arrays to be dynamically determined at runtime. Is this possible?

J.F.Sebastian在此答案中提出了一种在多处理时在共享内存中使用python的numpy数组的好方法.该数组是可锁定的,这就是我想要的.我想做一些非常相似的事情,除了共享数组的数目可变.数组的数量将在运行时确定.他的示例代码非常清楚,几乎可以完全满足我的要求,但是我不清楚如何在不给每个数组一个硬编码名称(例如shared_arr_1shared_arr_2等)的情况下声明可变数量的此类数组. .什么是正确的方法?

In this answer, J.F. Sebastian lays out a nice way to use python's numpy arrays in shared memory while multiprocessing. The array is lockable, which is what I want. I would like to do something very similar, except with a variable number of shared arrays. The number of arrays would be determined at runtime. His example code is very clear and does almost exactly what I want, but I'm unclear how to declare a variable number of such arrays without giving each one of them a hard-coded name like shared_arr_1, shared_arr_2, et cetera. What's the right way to do this?

推荐答案

事实证明,这比我想象的要容易!在塞巴斯蒂安·J·F·塞巴斯蒂安(J.F. Sebastian)的鼓励下,这是我的答案:

Turns out this was easier than I thought! Following J.F. Sebastian's encouragement, here's my crack at an answer:

import time
import ctypes
import logging
import Queue
import multiprocessing as mp
import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    data_pipeline = Image_Data_Pipeline(
        num_data_buffers=5,
        buffer_shape=(60, 256, 512))
    start = time.clock()
    data_pipeline.load_buffers(data_pipeline.num_data_buffers)
    end = time.clock()
    data_pipeline.close()
    print "Elapsed time:", end-start


class Image_Data_Pipeline:
    def __init__(self, num_data_buffers, buffer_shape):
        """
        Allocate a bunch of 16-bit buffers for image data
        """
        self.num_data_buffers = num_data_buffers
        self.buffer_shape = buffer_shape
        pix_per_buf = np.prod(buffer_shape)
        self.data_buffers = [mp.Array(ctypes.c_uint16, pix_per_buf)
                             for b in range(num_data_buffers)]
        self.idle_data_buffers = range(num_data_buffers)

        """
        Launch the child processes that make up the pipeline
        """
        self.camera = Data_Pipeline_Process(
            target=child_process, name='Camera',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape)
        self.display_prep = Data_Pipeline_Process(
            target=child_process, name='Display Prep',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.camera.output_queue)
        self.file_saving = Data_Pipeline_Process(
            target=child_process, name='File Saving',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.display_prep.output_queue)
        return None

    def load_buffers(self, N, timeout=0):
        """
        Feed the pipe!
        """
        for i in range(N):
            self.camera.input_queue.put(self.idle_data_buffers.pop())

        """
        Wait for the buffers to idle. Here would be a fine place to
        feed them back to the pipeline, too.
        """
        while True:
            try:
                self.idle_data_buffers.append(
                    self.file_saving.output_queue.get_nowait())
                info("Buffer %i idle"%(self.idle_data_buffers[-1]))
            except Queue.Empty:
                time.sleep(0.01)
            if len(self.idle_data_buffers) >= self.num_data_buffers:
                break
        return None

    def close(self):
        self.camera.input_queue.put(None)
        self.display_prep.input_queue.put(None)
        self.file_saving.input_queue.put(None)
        self.camera.child.join()
        self.display_prep.child.join()
        self.file_saving.child.join()


class Data_Pipeline_Process:
    def __init__(
        self,
        target,
        name,
        data_buffers,
        buffer_shape,
        input_queue=None,
        output_queue=None,
        ):
        if input_queue is None:
            self.input_queue = mp.Queue()
        else:
            self.input_queue = input_queue

        if output_queue is None:
            self.output_queue = mp.Queue()
        else:
            self.output_queue = output_queue

        self.command_pipe = mp.Pipe() #For later, we'll send instrument commands

        self.child = mp.Process(
            target=target,
            args=(name, data_buffers, buffer_shape,
                  self.input_queue, self.output_queue, self.command_pipe),
            name=name)
        self.child.start()
        return None

def child_process(
    name,
    data_buffers,
    buffer_shape,
    input_queue,
    output_queue,
    command_pipe):
    if name == 'Display Prep':
        display_buffer = np.empty(buffer_shape, dtype=np.uint16)
    while True:
        try:
            process_me = input_queue.get_nowait()
        except Queue.Empty:
            time.sleep(0.01)
            continue
        if process_me is None:
            break #We're done
        else:
            info("start buffer %i"%(process_me))
            with data_buffers[process_me].get_lock():
                a = np.frombuffer(data_buffers[process_me].get_obj(),
                                  dtype=np.uint16)
                if name == 'Camera':
                    """
                    Fill the buffer with data (eventually, from the
                    camera, dummy data for now)
                    """
                    a.fill(1)
                elif name == 'Display Prep':
                    """
                    Process the 16-bit image into a display-ready
                    8-bit image. Fow now, just copy the data to a
                    similar buffer.
                    """
                    display_buffer[:] = a.reshape(buffer_shape)
                elif name == 'File Saving':
                    """
                    Save the data to disk.
                    """
                    a.tofile('out.raw')
            info("end buffer %i"%(process_me))
            output_queue.put(process_me)
    return None

if __name__ == '__main__':
    main()

背景:这是数据获取管道的框架.我想以很高的速率获取数据,对其进行处理以进行屏幕显示,然后将其保存到磁盘.我从来都不希望显示速率或磁盘速率限制采集,这就是为什么我认为在各个处理循环中使用单独的子进程是合适的.

Background: This is the skeleton of a data-acquisition pipeline. I want to acquire data at a very high rate, process it for on-screen display, and save it to disk. I don't ever want display rate or disk rate to limit acquisition, which is why I think using separate child processes in individual processing loops is appropriate.

这是虚拟程序的典型输出:

Here's typical output of the dummy program:

C:\code\instrument_control>c:\Python27\python.exe test.py
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[[INFO/Camera] child process calling self.run()
INFO/Display Prep] child process calling self.run()
[INFO/Camera] start buffer 4
[INFO/File Saving] child process calling self.run()
[INFO/Camera] end buffer 4
[INFO/Camera] start buffer 3
[INFO/Camera] end buffer 3
[INFO/Camera] start buffer 2
[INFO/Display Prep] start buffer 4
[INFO/Camera] end buffer 2
[INFO/Camera] start buffer 1
[INFO/Camera] end buffer 1
[INFO/Camera] start buffer 0
[INFO/Camera] end buffer 0
[INFO/Display Prep] end buffer 4
[INFO/Display Prep] start buffer 3
[INFO/File Saving] start buffer 4
[INFO/Display Prep] end buffer 3
[INFO/Display Prep] start buffer 2
[INFO/File Saving] end buffer 4
[INFO/File Saving] start buffer 3
[INFO/MainProcess] Buffer 4 idle
[INFO/Display Prep] end buffer 2
[INFO/Display Prep] start buffer 1
[INFO/File Saving] end buffer 3
[INFO/File Saving] start buffer 2
[INFO/MainProcess] Buffer 3 idle
[INFO/Display Prep] end buffer 1
[INFO/Display Prep] start buffer 0
[INFO/File Saving] end buffer 2
[INFO/File Saving] start buffer 1
[[INFO/MainProcess] Buffer 2 idle
INFO/Display Prep] end buffer 0
[INFO/File Saving] end buffer 1
[INFO/File Saving] start buffer 0
[INFO/MainProcess] Buffer 1 idle
[INFO/File Saving] end buffer 0
[INFO/MainProcess] Buffer 0 idle
[INFO/Camera] process shutting down
[INFO/Camera] process exiting with exitcode 0
[INFO/Display Prep] process shutting down
[INFO/File Saving] process shutting down
[INFO/Display Prep] process exiting with exitcode 0
[INFO/File Saving] process exiting with exitcode 0
Elapsed time: 0.263240348548
[INFO/MainProcess] process shutting down

C:\code\instrument_control>

它似乎可以满足我的要求:处理数据以显示并保存到磁盘,而不会影响采集速率.

It seems to do what I want: the data gets processed for display and saved to disk without interfering with the acquisition rate.

这篇关于使用python多处理动态创建共享数组的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆