为什么在使用共享的numpy数据的for循环中使用pythons多处理解决尴尬的并行问题时,为什么没有加速? [英] Why is there no speed-up when using pythons multiprocessing for embarassingly parallel problem within a for-loop, with shared numpy data?

查看:120
本文介绍了为什么在使用共享的numpy数据的for循环中使用pythons多处理解决尴尬的并行问题时,为什么没有加速?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想加快与贝叶斯推理有关的令人尴尬的并行问题.目的是在给定矩阵A的情况下,推断一组图像x的系数u,使得X = A * U. X的尺寸为mxn,A mxp和U pxn.对于X的每一列,必须推断出系数U的最佳对应列.最后,此信息用于更新A.我使用m = 3000,p = 1500和n = 100. 因此,由于它是线性模型,系数矩阵u的推论包括n个独立的计算.因此,我尝试使用Python的多处理模块,但是没有提高速度.

I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U. X has dimensions mxn, A mxp and U pxn. For each column of X, one has to infer the optimal corresponding column of the coefficients U. In the end, this information is used to update A. I use m = 3000, p = 1500 and n = 100. So, as it is a linear model, the inference of the coefficient-matrix u consists of n independent calculations. Thus, I tried to work with the multiprocessing module of Python, but there is no speed up.

这是我所做的:

没有并行化的主要结构是:

The main structure, without parallelization, is:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多处理的实现:

我尝试实现多处理.我有一台可以使用的8核机器.

I tried to implement multiprocessing. I have an 8-core machine that I can use.

  1. 有3个for循环.似乎唯一可以并行化"的是第三个,其中可以推断出系数:
    • 生成队列,并将迭代数从0到batch_size-1堆叠到队列中
    • 生成8个进程,并使其通过队列工作
  1. There are 3 for-loops. The only one that seems to be "parallelizable" is the third one, where the coefficients are inferred:
    • Generate a Queue and stack the iteration-numbers from 0 to batch_size-1 into the Queue
    • Generate 8 processes, and let them work through the Queue

因此,我用以下内容替换了第三个循环:

So, I replaced this third loop with the following:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

这是Wrap_mp类:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

这是infer_coefficients_mp函数:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

现在的问题如下:

  1. 对于给定的数据维度,多处理版本并不比单线程版本快.
  2. 进程ID每次迭代都增加.这是否意味着不断有新的过程产生?这不会产生巨大的开销吗?我该如何避免呢?是否有可能在整个for循环中创建8个不同的过程,并仅使用数据进行更新?
  3. 我在进程之间共享系数U的方式是否会减慢计算速度?还有另一种更好的方法吗?
  4. 一组流程会更好吗?

我真的很感谢任何帮助!我已经在一个月前开始使用Python,但现在却迷失了.

I am really thankful for any sort of help! I have started working with Python a month ago, and am pretty lost now.

工程

推荐答案

每次创建流程时,都在创建一个新流程.如果您在for循环中执行此操作,那么可以,您每次都在循环中启动新进程.听起来您想要做的是在循环外部初始化队列和进程,然后在循环内部填充队列.

Every time you create a Process you are creating a new process. If you're doing that within your for loop, then yes, you are starting new processes every time through the loop. It sounds like what you want to do is initialize your Queue and Processes outside of the loop, then fill the Queue inside the loop.

我以前使用过multiprocessing.Pool,它很有用,但是它不能提供您已经使用Queue实现的功能.

I've used multiprocessing.Pool before, and it's useful, but it doesn't offer much over what you've already implemented with a Queue.

这篇关于为什么在使用共享的numpy数据的for循环中使用pythons多处理解决尴尬的并行问题时,为什么没有加速?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆