如何在python多处理中实现reduce操作? [英] How to implement a reduce operation in python multiprocessing?

查看:33
本文介绍了如何在python多处理中实现reduce操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 OpenMP 和 C++ 方面的专家级并行程序员.现在我正在尝试了解 Python 和 multiprocessing 库中的并行性.

I'm an expert parallel programmer in OpenMP and C++. Now I'm trying to understand parallelism in python and the multiprocessing library.

特别是,我试图并行化这个简单的代码,它随机地将一个数组递增 100 次:

In particular, I'm trying to parallelize this simple code, which randomly increment an array for 100 times:

from random import randint
import multiprocessing as mp
import numpy as np

def random_add(x):
    x[randint(0,len(x)-1)]  += 1

if __name__ == "__main__":
    print("Serial")
    x = np.zeros(8)
    for i in range(100):
        random_add(x)
    print(x)

    print("Parallel")
    x = np.zeros(8)    
    processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
    for p in processes:
        p.start()
    print(x)

然而,这是以下输出:

Serial
[  9.  18.  11.  15.  16.   8.  10.  13.]
Parallel
[ 0.  0.  0.  0.  0.  0.  0.  0.]

为什么会这样?好吧,我想我有一个解释:由于我们处于多处理(而不是多线程)中,每个进程都作为他自己的内存部分,即每个产生的进程都有自己的 x,即random_add(x) 终止后销毁.作为结论,主程序中的 x 从未真正更新过.

Why this happens? Well, I think I have an explanation: since we are in multiprocessing (and not multi-threading), each process as his own section of memory, i.e., each spawned process has his own x, which is destroyed once random_add(x) is terminated. As conclusion, the x in the main program is never really updated.

这是正确的吗?如果是这样,我该如何解决这个问题?简而言之,我需要一个全局reduce操作,将所有random_add调用的结果相加,获得与串行版本相同的结果.

Is this correct? And if so, how can I solve this problem? In a few words, I need a global reduce operation which sum the results of all the random_add calls, obtaining the same result of the serial version.

推荐答案

你应该在你的情况下使用共享内存对象:

You should use shared memory objects in your case:

from random import randint
import multiprocessing as mp

def random_add(x):
    x[randint(0,len(x)-1)]  += 1

if __name__ == "__main__":
    print("Serial")
    x = [0]*8
    for i in range(100):
        random_add(x)
    print(x)

    print("Parallel")
    x = mp.Array('i', range(8))
    processes = [mp.Process(target = random_add, args=(x,)) for i in range(100)]
    for p in processes:
        p.start()
    print(x[:])

为了代码的清晰性,我已经将 numpy 数组更改为序数列表

I've changed numpy array to ordinal list for the purpose of clearness of code

这篇关于如何在python多处理中实现reduce操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆