在Julia中对大型常量数据结构进行并行操作 [英] Operating in parallel on a large constant datastructure in Julia

查看:62
本文介绍了在Julia中对大型常量数据结构进行并行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个很大的字符串向量向量: 大约有50,000个字符串向量, 每个字符串包含2-15个长度为1-20个字符的字符串.

MyScoringOperation是对字符串向量(基准)进行操作并返回10100分数的数组(作为Float64s)的函数.运行MyScoringOperation大约需要0.01秒(取决于基准长度)

function MyScoringOperation(state:State, datum::Vector{String})
      ...
      score::Vector{Float64} #Size of score = 10000

我有什么相当于嵌套循环. 外循环通常会运行500次迭代

data::Vector{Vector{String}} = loaddata()
for ii in 1:500 
    score_total = zeros(10100)
    for datum in data
         score_total+=MyScoringOperation(datum)
    end
end

在一台计算机上,在3000个小型测试用例(而不是50,000个)上,每个外循环要花费100-300秒.

我有3台安装了Julia 3.9的功能强大的服务器(可以轻松获得3台,然后可以在下一个规模上获得数百台).


我对@parallel有基本的经验,但是似乎在复制常量上花费了很多时间(在较小的测试用例中或多或少地挂起了

看起来像:

data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500 

    score_total = @parallel(+) for datum in data
         MyScoringOperation(state, datum)
    end
    state = update(state, score_total)
end

我对此实现与@parallel一起工作的方式的理解是:

对于每个人 ii:

  1. data划分为每个工人的卡盘
  2. 将卡盘发送给每个工人
  3. 在其中处理所有程序块
  4. 主过程在结果到达时对其求和.

我想删除步骤2, 因此,与其向每个工作人员发送大量数据, 我只是向每个工作人员发送了一系列索引,他们从自己的data副本中进行查找.甚至更好的做法是,只给每个块分配自己的块,然后让它们每次都重复使用(节省大量RAM).


分析支持了我对@parellel的功能的信念. 对于类似范围的问题(甚至更小的数据), 非并行版本的运行时间为0.09秒, 并行运行 探查器显示几乎所有时间都花了185秒. Profiler显示其中几乎100%是与网络IO交互所花费的.

解决方案

这应该使您入门:

function get_chunks(data::Vector, nchunks::Int)
    base_len, remainder = divrem(length(data),nchunks)
    chunk_len = fill(base_len,nchunks)
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks
    function _it() 
        for ii in 1:nchunks
            chunk_start = sum(chunk_len[1:ii-1])+1
            chunk_end = chunk_start + chunk_len[ii] -1
            chunk = data[chunk_start: chunk_end]
            produce(chunk)
        end
    end
    Task(_it)
end

function r_chunk_data(data::Vector)
    all_chuncks = get_chunks(data, nworkers()) |> collect;
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end



function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
    total = nothing 
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe 
    @sync for rr in rem_results
        function gather(rr)
            res=fetch(rr)
            if total===nothing
                total=res
            else 
                total=red_acc(total,res)
            end
        end
        @async gather(rr)
    end
    total
end

function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
    rem_results = map(r_chunks) do rchunk
        function do_mapred()
            @assert r_chunk.where==myid()
            @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
        end
        remotecall(r_chunk.where,do_mapred)
    end
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end

rchunk_data将数据分为多个块(由get_chunks方法定义),并将这些块分别发送到不同的工作程序,并在其中将它们存储在RemoteRefs中. RemoteRef是对其他过程(以及可能的计算机)上的内存的引用,该

prechunked_map_reduce对某种类型的图进行变型,以减少每个工作人员首先在其每个卡盘元素上运行map_fun的情况,然后使用red_acc(减少累加器功能)对卡盘中的所有元素进行缩小.最后,每个工作人员返回结果,然后通过使用fetch_reduce将它们全部缩减在一起,然后使用fetch_reduce将它们组合在一起,以便我们可以添加第一个完成的第一个结果.

fetch_reduce是无阻塞的获取和归约操作.我相信它没有竞争条件,尽管这可能是由于@async@sync中的实现细节.当茱莉亚0.4出现时,很容易将其锁定,以使其明显没有比赛条件.

此代码并没有真正经过严格处理.我不相信 您可能还需要考虑使卡盘大小可调,以便向更快的工作人员(如果有些人拥有更好的网络或更快的cpus)可以看到更多数据

您需要重新表达您的代码,这是一个映射减少问题,看起来并不难.


使用以下方法进行测试:

data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))

分配给8个工作人员(与启动器不在同一台计算机上)时,大约需要0.03秒.

vs仅在本地运行:

@time reduce(+,map(mean,data))

花费〜0.06秒.

I have a large vector of vectors of strings: There are around 50,000 vectors of strings, each of which contains 2-15 strings of length 1-20 characters.

MyScoringOperation is a function which operates on a vector of strings (the datum) and returns an array of 10100 scores (as Float64s). It takes about 0.01 seconds to run MyScoringOperation (depending on the length of the datum)

function MyScoringOperation(state:State, datum::Vector{String})
      ...
      score::Vector{Float64} #Size of score = 10000

I have what amounts to a nested loop. The outer loop typically would runs for 500 iterations

data::Vector{Vector{String}} = loaddata()
for ii in 1:500 
    score_total = zeros(10100)
    for datum in data
         score_total+=MyScoringOperation(datum)
    end
end

On one computer, on a small test case of 3000 (rather than 50,000) this takes 100-300 seconds per outer loop.

I have 3 powerful servers with Julia 3.9 installed (and can get 3 more easily, and then can get hundreds more at the next scale).


I have basic experience with @parallel, however it seems like it is spending a lot of time copying the constant (It more or less hang on the smaller testing case)

That looks like:

data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500 

    score_total = @parallel(+) for datum in data
         MyScoringOperation(state, datum)
    end
    state = update(state, score_total)
end

My understanding of the way this implementation works with @parallel is that it:

For Each ii:

  1. partitions data into a chuck for each worker
  2. sends that chuck to each worker
  3. works all process there chunks
  4. main procedure sums the results as they arrive.

I would like to remove step 2, so that instead of sending a chunk of data to each worker, I just send a range of indexes to each worker, and they look it up from their own copy of data. or even better, only giving each only their own chunk, and having them reuse it each time (saving on a lot of RAM).


Profiling backs up my belief about the functioning of @parellel. For a similarly scoped problem (with even smaller data), the non-parallel version runs in 0.09seconds, and the parallel runs in And the profiler shows almost all the time is spent 185 seconds. Profiler shows almost 100% of this is spend interacting with network IO.

解决方案

This should get you started:

function get_chunks(data::Vector, nchunks::Int)
    base_len, remainder = divrem(length(data),nchunks)
    chunk_len = fill(base_len,nchunks)
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks
    function _it() 
        for ii in 1:nchunks
            chunk_start = sum(chunk_len[1:ii-1])+1
            chunk_end = chunk_start + chunk_len[ii] -1
            chunk = data[chunk_start: chunk_end]
            produce(chunk)
        end
    end
    Task(_it)
end

function r_chunk_data(data::Vector)
    all_chuncks = get_chunks(data, nworkers()) |> collect;
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end



function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
    total = nothing 
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe 
    @sync for rr in rem_results
        function gather(rr)
            res=fetch(rr)
            if total===nothing
                total=res
            else 
                total=red_acc(total,res)
            end
        end
        @async gather(rr)
    end
    total
end

function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
    rem_results = map(r_chunks) do rchunk
        function do_mapred()
            @assert r_chunk.where==myid()
            @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
        end
        remotecall(r_chunk.where,do_mapred)
    end
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end

rchunk_data breaks the data into chunks, (defined by get_chunks method) and sends those chunks each to a different worker, where they are stored in RemoteRefs. The RemoteRefs are references to memory on your other proccesses(and potentially computers), that

prechunked_map_reduce does a variation on a kind of map reduce to have each worker first run map_fun on each of it's chucks elements, then reduce over all the elements in its chuck using red_acc (a reduction accumulator function). Finally each worker returns there result which is then combined by reducing them all together using red_acc this time using the fetch_reduce so that we can add the first ones completed first.

fetch_reduce is a nonblocking fetch and reduce operation. I believe it has no raceconditions, though this maybe because of a implementation detail in @async and @sync. When julia 0.4 comes out, it is easy enough to put a lock in to make it obviously have no race conditions.

This code isn't really battle hardened. I don;t believe the You also might want to look at making the chuck size tunable, so that you can seen more data to faster workers (if some have better network or faster cpus)

You need to reexpress your code as a map-reduce problem, which doesn't look too hard.


Testing that with:

data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))

Took ~0.03 seconds, when distributed across 8 workers (none of them on the same machine as the launcher)

vs running just locally:

@time reduce(+,map(mean,data))

took ~0.06 seconds.

这篇关于在Julia中对大型常量数据结构进行并行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆