在Julia中使用并行处理填充矩阵 [英] Filling a matrix using parallel processing in Julia

查看:76
本文介绍了在Julia中使用并行处理填充矩阵的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过并行处理来加快Julia(v.0.5.0)中动态编程问题的解决时间.问题涉及在每次迭代中为1073 x 19矩阵的每个元素选择最佳值,直到连续的矩阵差落在公差范围内.我认为,在每次迭代中,可以并行填充矩阵中每个元素的值.但是,我发现使用SharedArray会大大降低性能,并且我想知道是否存在更好的方法来解决此问题的并行处理.

I'm trying to speed up the solution time for a dynamic programming problem in Julia (v. 0.5.0), via parallel processing. The problem involves choosing the optimal values for every element of a 1073 x 19 matrix at every iteration, until successive matrix differences fall within a tolerance. I thought that, within each iteration, filling in the values for each element of the matrix could be parallelized. However, I'm seeing a huge performance degradation using SharedArray, and I'm wondering if there's a better way to approach parallel processing for this problem.

我为以下函数构造参数:

I construct the arguments for the function below:

    est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

    r          = 0.015
    tau        = 0.35
    rho        =est_params[1]
    sigma      =est_params[2]
    delta      = 0.15
    gamma      =est_params[3]
    a_capital  =est_params[4]
    lambda1    =est_params[5]
    lambda2    =est_params[6]
    s          =est_params[7]
    theta      =est_params[8]
    mu         =est_params[9]
    p_bar_k_ss  =est_params[10]
    beta  = (1+r)^(-1)
    sigma_range = 4
    gz = 19 
    gp = 29 
    gk = 37 

    lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
    z=exp(lnz)

    gk_m = fld(gk,2)
    # Need to add mu somewhere to k_ss
    k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
    k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
    insert!(k,gk_m+1,k_ss)
    sort!(k)
    p_bar=p_bar_k_ss*k_ss
    p = collect(linspace(-p_bar/2,p_bar,gp))

    #Tauchen
    N     = length(z)
    Z     = zeros(N,1)
    Zprob = zeros(Float32,N,N)

    Z[N]  = lnz[length(z)]
    Z[1]  = lnz[1]

    zstep = (Z[N] - Z[1]) / (N - 1)

    for i=2:(N-1)
        Z[i] = Z[1] + zstep * (i - 1)
    end

    for a = 1 : N
        for b = 1 : N
          if b == 1
              Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
          elseif b == N
              Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          else
              Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                           0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          end
        end
    end
    # Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
    # Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
    Zcumprob=zeros(Float32,gz,gz)
    # 2 in cumsum! denotes the 2nd dimension, i.e. columns
    cumsum!(Zcumprob, Zprob,2)


    gm = gk * gp

    control=zeros(gm,2)
    for i=1:gk
        control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
        control[(1+gp*(i-1)):(gp*i),2]=p
    end
    endog=copy(control)

    E=Array(Float32,gm,gm,gz)
    for h=1:gm
       for m=1:gm
           for j=1:gz
             # set the nonzero net debt indicator
              if endog[h,2]<0
                 p_ind=1

              else
                 p_ind=0
              end

               # set the investment indicator
                if (control[m,1]-(1-delta)*endog[h,1])!=0
                   i_ind=1
                else
                   i_ind=0
                end

                E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                    delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                     (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                    s*endog[h,2]*p_ind
                elem = E[m,h,j]
                if E[m,h,j]<0
                    E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
                else
                    E[m,h,j]=elem
                end
            end
        end
     end

然后我通过串行处理构造了函数.两个for循环遍历每个元素,以找到1072大小(=函数中的gm标量参数)数组中的最大值:

I then constructed the function with serial processing. The two for loops iterate through each element to find the largest value in a 1072-sized (=the gm scalar argument in the function) array:

function dynam_serial(E,gm,gz,beta,Zprob)
    v           = Array(Float32,gm,gz )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    Tv          = Array(Float32,gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
        end
      end

      diff = maxabs(Tv - v)
      v=copy(Tv)
    end
end

为此,我得到:

@time dynam_serial(E,gm,gz,beta,Zprob)

> 106.880008 seconds (91.70 M allocations: 203.233 GB, 15.22% gc time)

现在,我尝试使用共享阵列来受益于并行处理.请注意,我重新配置了迭代,以便只有一个for循环,而不是两个.我也用v=deepcopy(Tv);否则,将v复制为Array对象,而不是SharedArray:

Now, I try using Shared Arrays to benefit from parallel processing. Note that I reconfigured the iteration so that I only have one for loop, rather than two. I also use v=deepcopy(Tv); otherwise, v is copied as an Array object, rather than a SharedArray:

function dynam_parallel(E,gm,gz,beta,Zprob)
    v           = SharedArray(Float32,(gm,gz),init = S -> S[Base.localindexes(S)] = myid() )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'
      Tv          = SharedArray(Float32,gm,gz,init = S -> S[Base.localindexes(S)] = myid() )

      @sync @parallel for hj=1:(gm*gz)
         j=cld(hj,gm)
         h=mod(hj,gm)
         if h==0;h=gm;end;

         @async Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
      end

      diff = maxabs(Tv - v)
      v=deepcopy(Tv)
    end
end

为并行版本计时;并使用具有16GB内存的4核2.5 GHz I7处理器,我得到:

Timing the parallel version; and using a 4-core 2.5 GHz I7 processor with 16GB of memory, I get:

addprocs(3)
@time dynam_parallel(E,gm,gz,beta,Zprob)

> 164.237208 seconds (2.64 M allocations: 201.812 MB, 0.04% gc time)

我在这里做错了吗?还是有更好的方法在Julia中针对此特定问题进行并行处理?我已经考虑过使用分布式数组,但是很难看到如何将它们应用于当前问题.

Am I doing something incorrect here? Or is there a better way to approach parallel processing in Julia for this particular problem? I've considered using Distributed Arrays, but it's difficult for me to see how to apply them to the present problem.

更新: 根据@DanGetz及其有用的评论,我转而试图加快串行处理版本.我能够通过以下方式将性能降低到53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time):

UPDATE: Per @DanGetz and his helpful comments, I turned instead to trying to speed up the serial processing version. I was able to get performance down to 53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time) through:

1)升级到0.6.0(节省了大约25秒),其中包括有用的@views宏.

1) Upgrading to 0.6.0 (saved about 25 seconds), which includes the helpful @views macro.

2)根据Julia Performance Tips中有关预分配输出的部分,预分配我要填写的主数组(Tv): https://docs.julialang.org/en/latest/manual/performance-tips/. (又节省了25秒左右的时间)

2) Preallocating the main array I'm trying to fill in (Tv), per the section on Preallocating Outputs in the Julia Performance Tips: https://docs.julialang.org/en/latest/manual/performance-tips/. (saved another 25 or so seconds)

最大的减速似乎来自add_vecs函数,该函数将两个较大矩阵的子数组加在一起.我曾尝试过去矢量化并使用BLAS函数,但无法产生更好的性能.

The biggest remaining slow-down seems to be coming from the add_vecs function, which sums together subarrays of two larger matrices. I've tried devectorizing and using BLAS functions, but haven't been able to produce better performance.

无论如何,下面是dynam_serial的改进代码:

In any event, the improved code for dynam_serial is below:

function add_vecs(r::Array{Float32},h::Int,j::Int,E::Array{Float32},exp_v::Array{Float32},beta::Float32)

  @views r=E[:,h,j] + beta*exp_v[:,j]
  return r
end


function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
    v           = Array{Float32}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    Tv          = Array{Float32}(gm,gz)
    r           = Array{Float32}(gm)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          @views Tv[h,j]=findmax(add_vecs(r,h,j,E,exp_v,beta))[1]
        end
      end

      diff = maximum(abs,Tv - v)
      v=copy(Tv)
    end
    return Tv
end

推荐答案

如果add_vecs似乎是关键功能,则编写显式的for循环可以提供更多优化.如何进行以下基准测试:

If add_vecs seems to be the critical function, writing an explicit for loop could offer more optimization. How does the following benchmark:

function add_vecs!(r::Array{Float32},h::Int,j::Int,E::Array{Float32},
                  exp_v::Array{Float32},beta::Float32)
    @inbounds for i=1:size(E,1)
        r[i]=E[i,h,j] + beta*exp_v[i,j]
    end
    return r
end

更新

要继续优化dynam_serial,我尝试删除更多分配.结果是:

To continue optimizing dynam_serial I have tried to remove more allocations. The result is:

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
                           exp_v::Array{Float64},beta::Float64)
    @inbounds for i=1:gm            
        r[i] = E[i,h,j]+beta*exp_v[i,j]
    end
    return findmax(r)[1]
end

function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
                      beta::Float64,Zprob::Array{Float64})
    v           = Array{Float64}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    r           = Array{Float64}(gm)
    exp_v       = Array{Float64}(gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1.0 # arbitrary initial value greater than convcrit
    while diff>convcrit
        A_mul_Bt!(exp_v,v,Zprob)
        diff = -Inf
        for h=1:gm
            for j=1:gz
                oldv = v[h,j]
                newv = add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
                v[h,j]= newv
                diff = max(diff, oldv-newv, newv-oldv)
            end
        end
    end
    return v
end

将功能切换为使用Float64应该可以提高速度(因为CPU本质上针对64位字长进行了优化).同样,使用变异A_mul_Bt!直接保存另一个分配.通过切换数组vTv来避免copy(...).

Switching the functions to use Float64 should increase speed (as CPUs are inherently optimized for 64-bit word lengths). Also, using the mutating A_mul_Bt! directly saves another allocation. Avoiding the copy(...) by switching the arrays v and Tv.

这些优化如何改善您的运行时间?

How do these optimizations improve your running time?

第二次更新

将UPDATE节中的代码更新为使用findmax.另外,将dynam_serial更改为不使用Tvv,因为除了diff计算外无需保存旧版本,该计算现在在循环内完成.

Updated the code in the UPDATE section to use findmax. Also, changed dynam_serial to use v without Tv, as there was no need to save the old version except for the diff calculation, which is now done inside the loop.

这篇关于在Julia中使用并行处理填充矩阵的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆