令人尴尬的并行循环,每次迭代都有复杂的输出 [英] Embarrassingly parallel for loop with complex outputs in each iteration

查看:218
本文介绍了令人尴尬的并行循环,每次迭代都有复杂的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在python中有一个令人尴尬的并行for循环(要重复n次),每次迭代执行一个复杂的任务,并返回numpy数组和dict的混合(因此不能将单个数字填充到数组中-否则暂时将其视为复杂的一堆).重复不需要以任何特定的顺序进行-我只需要能够唯一地标识n迭代中的每个i(例如,将结果独立保存在重复中).实际上,它们甚至不需要通过索引/计数器来标识,而是一种独特的东西,因为不需要对其进行排序(我可以轻松地将它们重新填充到更大的数组中.)

I have an embarrassingly parallel for loop in python (to be repeated n times), each iteration performing a complex task and returning a mix of numpy arrays and dict (so not a single number to filled into an array - otherwise think of them as complex bunch for now). The repetitions don't need to be in any particular order - I just need to be able to identify each i of the n iterations uniquely (e.g. to save results within repetition independently). In fact they don't even need to be identified by an index/counter, but an unique something as they don't need to be ordered (I can easily fill them back into a bigger array.)

举一个更具体的例子,我想并行执行以下任务:

To give a more concrete example, I would like parallelize the following task:

def do_complex_task(complex_input1, input2, input3, input_n):
  "all important computation done here - independent of i or n"

  inner_result1, inner_result2 = np.zeros(100), np.zeros(100)
  for smaller_input in complex_input1:
    inner_result1 = do_another_complex_task(smaller_input, input2, input3, input_n)
    inner_result2 = do_second_complex_task(smaller_input, input2, input3, input_n)

  # do some more to produce few more essential results
  dict_result = blah()

  unique_identifier = get_unique_identifier_for_this_thread() # I don't know how

  # save results for each repetition independently before returning, 
  # instead of waiting for full computation to be done which can take a while
  out_path = os.path.join(out_dir, 'repetition_{}.pkl'.format(unique_identifier))

  return inner_result1, inner_result2, inner_result_n, dict_result


def main_compute()
  "main method to run the loop"

  n = 256 # ideally any number, but multiples of 4 possible, for even parallelization.

  result1  = np.zeros([n, 100])
  result2  = np.zeros([n, 100])
  result_n = np.zeros([n, 100])
  dict_result = list()

  # this for loop does not need to be computed in any order (range(n) is an illustration)
  # although this order would be ideal, as it makes it easy to populate results into a bigger array
  for i in range(n):
    # this computation has nothing to do with i or n!
    result1[i, :], result2[i, :], result_n[i, :], dict_result[i] = do_complex_task(complex_input1, input2, input3, input_n)

  # I need to parallelize the above loop to speed up stupidly parallel processing.


if __name__ == '__main__':
    pass

我已经阅读了相当广泛的文章,目前尚不清楚哪种策略更聪明,最简单,并且没有任何可靠性问题.

I've read reasonably widely and it is not clear which strategy would be smarter and easiest, without any reliability issues.

complex_input1可能很大-因此,我不希望酸洗过多的I/O开销.

Also complex_input1 can be large - so I'd not prefer lot of I/O overhead with pickling.

我当然可以返回一个列表(包含所有复杂部分),该列表将附加到主列表中,之后可以组装成我喜欢的格式(矩形数组等).例如,可以使用 joblib 轻松完成此操作.但是,我正在尝试向大家学习,以找出好的解决方案.

I can certainly return a single list (with all the complex parts), which gets appended to a master list, which can later on be assembled into the format I like (rectangular arrays etc). This can be done easily with joblib for example. However, I am trying to learn from you all to identify good solutions.

编辑:我想确定以下解决方案.让我知道它可能出什么问题或如何在速度,无副作用等方面进一步改进它.在笔记本电脑上进行了[em>几次非结构化试验之后,尚不清楚是否存在明显的加速效果由于这个.

EDIT: I think I am settling on the following solution. Let me know what could go wrong with it or how can I improve it further in terms of speed, no side effects etc. After few unstructured trials on my laptop, it is not clear if there is clear speedup due to this.

from multiprocessing import Pool, Manager
chunk_size = int(np.ceil(num_repetitions/num_procs))
with Manager() as proxy_manager:
    shared_inputs = proxy_manager.list([complex_input1, input2, another, blah])
    partial_func_holdout = partial(key_func_doing_work, *shared_inputs)

    with Pool(processes=num_procs) as pool:
        results = pool.map(partial_func_holdout, range(num_repetitions), chunk_size)

推荐答案

有一个内置的解决方案,形式为multiprocessing.Pool.map

There's a built-in solution for this in the form of multiprocessing.Pool.map

import multiprocessing
from functools import partial

def do_task(a, b):
    return (42, {'x': a * 2, 'y': b[::-1]})

if __name__ == '__main__':
    a_values = ['Hello', 'World']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(do_task, b='fixed b value'), a_values)
    print(results)

此后,results将按与a_values相同的顺序包含结果.

After this, results will contain the results in the same order as a_values.

要求是参数和返回值是可以Pickle'able的.除了它们可能很复杂之外,尽管如果数据很多,可能会降低性能.

The requirement is that the arguments and return values are Pickle'able. Except for that they can be complicated, although if it's a lot of data there may be some performance penalty.

我不知道这是否是您认为不错的解决方案;我已经使用了很多次,对我来说效果很好.

I don't know if this is what you consider a good solution; I've used it many times and it works great for me.

您可以将返回值放在一个类中,但是我个人觉得这并没有真正的好处,因为Python没有静态类型检查.

You can put the return values in a class, but personally I feel that doesn't really offer benefits since Python doesn't have static type checking.

它最多可以并行启动#processes个作业.它们应该是独立的,并且顺序无关紧要(我认为它们是按提供的顺序开始的,但是它们可以按另一个顺序完成).

It just starts up to #processes jobs in parallel. They should be independent and the order doesn't matter (I think they're started in the provided order, but they may be complete in another order).

基于此答案的示例.

这篇关于令人尴尬的并行循环,每次迭代都有复杂的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆