只读numpy数组的快速队列 [英] Fast Queue of read only numpy arrays

查看:79
本文介绍了只读numpy数组的快速队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个多处理作业,我在其中排队读取唯一的numpy数组,这是生产者使用者管道的一部分.

I have a multiprocessing job where I'm queuing read only numpy arrays, as part of a producer consumer pipeline.

当前它们正在被腌制,因为这是multiprocessing.Queue的默认行为,会降低性能.

Currently they're being pickled, because this is the default behaviour of multiprocessing.Queue which slows down performance.

是否有任何pythonic方式将引用传递给共享内存,而不是对数组进行酸洗?

Is there any pythonic way to pass references to shared memory instead of pickling the arrays?

不幸的是,启动使用者之后会生成数组,而这没有简单的方法. (因此,全局变量方法将很丑陋.).

Unfortunately the arrays are being generated after the consumer is started, and there is no easy way around that. (So the global variable approach would be ugly...).

[请注意,在以下代码中,我们不希望并行计算h(x0)和h(x1).相反,我们看到h(x0)和g(h(x1))是并行计算的(就像CPU中的流水线一样.)

[Note that in the following code we are not expecting h(x0) and h(x1) to be computed in parallel. Instead we see h(x0) and g(h(x1)) computed in parallel (like a pipelining in a CPU).]

from multiprocessing import Process, Queue
import numpy as np

class __EndToken(object):
    pass

def parrallel_pipeline(buffer_size=50):
    def parrallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parrallel_pipeline_with_args


@parrallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parrallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parrallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

if __name__ == "__main__":
    rs = f(g(h(xs())))
    for r in rs:
        print r

推荐答案

在线程或进程之间共享内存

使用线程而不是多处理

由于您使用的是numpy,因此可以利用以下事实:全局解释器numpy计算过程中会释放锁.这意味着您可以使用标准线程和共享内存进行并行处理,而不是进行多处理和进程间通信.这是您的代码的版本,已调整为使用threading.Thread和Queue.Queue而不是multiprocessing.Process和multiprocessing.Queue.这将通过队列传递一个numpy ndarray而不对其进行酸洗.在我的计算机上,它的运行速度比您的代码快3倍. (但是,它仅比您的代码的串行版本快20%.我建议更进一步的方法.)

Sharing memory between threads or processes

Use threading instead of multiprocessing

Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)

from threading import Thread
from Queue import Queue
import numpy as np

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Thread(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args

@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs)  # 12.2s

将numpy数组存储在共享内存中

另一个接近您要求的选项是继续使用多处理程序包,但使用存储在共享内存中的数组在进程之间传递数据.下面的代码创建一个新的ArrayQueue类来执行此操作. ArrayQueue对象应在产生子流程之前创建.它创建和管理由共享内存支持的numpy数组池.将结果数组推送到队列时,ArrayQueue将数据从该数组复制到现有的共享内存数组中,然后将共享内存数组的ID通过队列传递.这比通过队列发送整个数组要快得多,因为它避免了对数组进行腌制.这具有与上面的线程版本相似的性能(慢了约10%),并且如果全局解释器锁定成为问题(例如,您在函数中运行了很多python代码),则伸缩性可能会更好.

Store numpy arrays in shared memory

Another option, close to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below creates a new ArrayQueue class to do that. The ArrayQueue object should be created before spawning subprocesses. It creates and manages a pool of numpy arrays backed by shared memory. When a result array is pushed onto the queue, ArrayQueue copies the data from that array into an existing shared-memory array, then passes the id of the shared-memory array through the queue. This is much faster than sending the whole array through the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions).

from multiprocessing import Process, Queue, Array
import numpy as np

class ArrayQueue(object):
    def __init__(self, template, maxsize=0):
        if type(template) is not np.ndarray:
            raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.')
        if maxsize == 0:
            # this queue cannot be infinite, because it will be backed by real objects
            raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.')

        # find the size and data type for the arrays
        # note: every ndarray put on the queue must be this size
        self.dtype = template.dtype
        self.shape = template.shape
        self.byte_count = len(template.data)

        # make a pool of numpy arrays, each backed by shared memory, 
        # and create a queue to keep track of which ones are free
        self.array_pool = [None] * maxsize
        self.free_arrays = Queue(maxsize)
        for i in range(maxsize):
            buf = Array('c', self.byte_count, lock=False)
            self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)
            self.free_arrays.put(i)

        self.q = Queue(maxsize)

    def put(self, item, *args, **kwargs):
        if type(item) is np.ndarray:
            if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:
                # get the ID of an available shared-memory array
                id = self.free_arrays.get()
                # copy item to the shared-memory array
                self.array_pool[id][:] = item
                # put the array's id (not the whole array) onto the queue
                new_item = id
            else:
                raise ValueError(
                    'ndarray does not match type or shape of template used to initialize ArrayQueue'
                )
        else:
            # not an ndarray
            # put the original item on the queue (as a tuple, so we know it's not an ID)
            new_item = (item,)
        self.q.put(new_item, *args, **kwargs)

    def get(self, *args, **kwargs):
        item = self.q.get(*args, **kwargs)
        if type(item) is tuple:
            # unpack the original item
            return item[0]
        else:
            # item is the id of a shared-memory array
            # copy the array
            arr = self.array_pool[item].copy()
            # put the shared-memory array back into the pool
            self.free_arrays.put(item)
            return arr

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args


@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs()))))   # 13.5s

并行处理样本而不是函数

上面的代码仅比单线程版本快20%(以下所示的串行版本为12.2s与14.8s).这是因为每个函数都在单个线程或进程中运行,并且大部分工作是由xs()完成的.上面的示例的执行时间几乎与您刚刚运行%time print sum(1 for x in xs())的时间相同.

Parallel processing of samples instead of functions

The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran %time print sum(1 for x in xs()).

如果您的真实项目具有更多的中间功能和/或比您显示的功能更复杂,那么工作负载可能会在处理器之间更好地分配,这可能不是问题.但是,如果您的工作量确实与您提供的代码相似,那么您可能需要重构代码,以便为每个线程分配一个样本,而不是为每个线程分配一个函数.看起来像下面的代码(显示了线程和多处理版本):

If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):

import multiprocessing
import threading, Queue
import numpy as np

def f(x):
    return x + 1.0

def g(x):
    return x * 3

def h(x):
    return x * x

def final(i):
    return f(g(h(x(i))))

def final_sum(i):
    return f(g(h(x(i)))).sum()

def x(i):
    # produce sample number i
    return np.random.uniform(0, 1, (500, 2000))

def rs_serial(func, n):
    for i in range(n):
        yield func(i)

def rs_parallel_threaded(func, n):
    todo = range(n)
    q = Queue.Queue(2*n_workers)

    def worker():
        while True:
            try:
                # the global interpreter lock ensures only one thread does this at a time
                i = todo.pop()
                q.put(func(i))
            except IndexError:
                # none left to do
                q.put(None)
                break

    threads = []
    for j in range(n_workers):
        t = threading.Thread(target=worker)
        t.daemon=False
        threads.append(t)   # in case it's needed later
        t.start()

    while True:
        x = q.get()
        if x is None:
            break
        else:
            yield x

def rs_parallel_mp(func, n):
    pool = multiprocessing.Pool(n_workers)
    return pool.imap_unordered(func, range(n))

n_workers = 4
n_samples = 1000

print "serial:"  # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:"  # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))

print "mp return arrays:"  # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:"  # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))

此代码的线程版本仅比我给出的第一个示例稍快,并且仅比串行版本快30%.那并没有我所期望的那么快.也许Python仍然对GIL感到有些困惑吗?

The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?

多处理版本比原始多处理代码的执行速度显着提高,这主要是因为所有功能都在单个过程中链接在一起,而不是排队(和腌制)中间结果.但是,它仍然比串行版本慢,因为所有结果数组在被imap_unordered返回之前必须先进行腌制(在工作进程中)和取消腌制(在主进程中).但是,如果可以安排它以便管道返回聚合结果而不是完整的数组,则可以避免酸洗开销,并且多处理版本最快:比串行版本快43%.

The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.

好的,为了完整起见,下面是第二个示例的版本,该示例对原始生成器函数使用多重处理,而不是上面显示的更精细的函数.这使用一些技巧将样本分散到多个流程中,这可能使其不适用于许多工作流程.但是,使用生成器似乎确实比使用更精细的函数要快一点,并且与上面显示的串行版本相比,此方法可使您的速度提高54%.但是,只有在不需要从辅助函数返回完整数组的情况下,该选项才可用.

OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.

import multiprocessing, itertools, math
import numpy as np

def f(xs):
    for x in xs:
        yield x + 1.0

def g(xs):
    for x in xs:
        yield x * 3

def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

def final():
    return f(g(h(xs())))

def final_sum():
    for x in f(g(h(xs()))):
        yield x.sum()

def get_chunk(args):
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation."""
    return list(itertools.islice(args[0](), args[1]))

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""

    pool = multiprocessing.Pool(n_workers)

    # how many chunks will be needed to yield at least max_items items?
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))

    # generate a suitable series of arguments for get_chunk()
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks)

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items)
    chunk_gen = pool.imap_unordered(get_chunk, args_list)

    # parallel_gen flattens the chunks, and yields individual items
    parallel_gen = itertools.chain.from_iterable(chunk_gen)

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items)


# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:"  # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:"  # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))


# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:"  # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:"  # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))

这篇关于只读numpy数组的快速队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆