Python多重处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗? [英] Python multiprocessing: can I reuse processes (already parallelized functions) with updated global variable?

查看:70
本文介绍了Python多重处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先,让我向您展示我当前的设置:

At first let me show you the current setup I have:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(param):
   process(another_module.global_variable[param])


def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result

在这里,我使用创建进程池之前更新的共享变量,该进程池包含大量数据,并且确实提高了我的速度,因此现在似乎不被腌制.此外,此变量也属于导入模块的范围(如果重要).

Here I use shared variable updated before creating process pool, which contains huge data, and that indeed gained me speedup, so it seem to be not pickled now. Also this variable belongs to the scope of an imported module (if it's important).

当我尝试创建这样的设置时:

When I tried to create setup like this:

another_module.global_variable = []

p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module_global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result  

p记住"全局共享列表为空,并且从调度程序内部调用时拒绝使用新数据.

p "remembered" that global shared list was empty and refused to use new data when was called from inside the dispatcher.

现在是问题所在:使用上述第一种设置在8个内核上处理约600个数据对象,我的并行计算运行8秒钟,而单线程运行12秒钟.

Now here is the problem: processing ~600 data objects on 8 cores with the first setup above, my parallel computation runs 8 sec, while single-threaded it works 12 sec.

这就是我的想法:只要对多处理泡菜数据进行处理,并且每次都需要重新创建进程,我就需要对泡菜函数big_function()进行泡菜,因此我在这上面浪费了时间.使用全局变量可以部分解决数据问题(但是我仍然需要在每次更新时重新创建池).

This is what I think: as long, as multiprocessing pickles data, and I need to re-create processes each time, I need to pickle function big_function(), so I lose time on that. The situation with data was partially solved using global variable (but I still need to recreate pool on each update of it).

我如何处理big_function()的实例(这取决于其他模块,numpy等的许多其他功能)?我是否可以一劳永逸地创建副本的os.cpu_count(),并以某种方式将新数据输入副本并接收结果,从而重复使用工作人员?

What can I do with instances of big_function()(which depends on many other functions from other modules, numpy, etc)? Can I create os.cpu_count() of it's copies once and for all, and somehow feed new data into them and receive results, reusing workers?

推荐答案

只需解决记住"问题:

another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    another_module_global_variable = huge_list
    params = range(len(another_module.global_variable))
    multiprocessing_result = list(p.imap_unordered(big_function, params))
    return multiprocessing_result 

似乎是问题出在创建Pool实例时.

What seems to be the problem is when you are creating Pool instance.

那是为什么?

这是因为当您创建Pool的实例时,它确实设置了工作程序的数量(默认情况下等于CPU内核的数量),并且它们当时都已启动(派生).这意味着工作人员拥有父母全局状态的副本(以及其他所有内容中的another_module.global_variable),并且具有写时复制策略,当您更新another_module.global_variable的值时,可以在父母的过程中对其进行更改.工人对旧价值有参考.这就是为什么您对此有疑问的原因.

It's because when you create instance of Pool, it does set up number of workers (by default equal to a number of CPU cores) and they are all started (forked) at that time. That means workers have a copy of parents global state (and another_module.global_variable among everything else), and with copy-on-write policy, when you update value of another_module.global_variable you change it in parent's process. Workers have a reference to the old value. That is why you have a problem with it.

这里有几个链接可以为您提供更多解释: this

Here are couple of links that can give you more explanation on this: this and this.

这是一个小片段,您可以在其中切换更改全局变量值和启动过程的行,并检查子进程中打印的内容.

Here is a small snippet where you can switch lines where global variable value is changed and where process is started, and check what is printed in child process.

from __future__ import print_function
import multiprocessing as mp

glob = dict()
glob[0] = [1, 2, 3]


def printer(a):
    print(globals())
    print(a, glob[0])


if __name__ == '__main__':
    p = mp.Process(target=printer, args=(1,))
    p.start()
    glob[0] = 'test'
    p.join()

这是Python2.7代码,但它也适用于Python3.6.

This is the Python2.7 code, but it works on Python3.6 too.

该问题的解决方案是什么?

What would be the solution for this issue?

好吧,回到第一个解决方案.您更新导入的模块变量的值,然后创建进程池.

Well, go back to first solution. You update value of imported module's variable and then create pool of processes.

现在真正的问题是缺乏加速.

Now the real issue with the lack of speedup.

这是有关如何腌制功能的文档:

请注意,功能(内置和用户定义)由完全"腌制 合格"名称参考,而不是按值.这意味着只有 腌制函数名称以及模块名称 函数是在其中定义的.函数的代码或其任何代码均未定义 函数属性被腌制.因此,定义模块必须是 可以在解开环境中导入,并且模块必须包含 命名的对象,否则将引发异常.

Note that functions (built-in and user-defined) are pickled by "fully qualified" name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised.

这意味着您的函数酸洗不应是浪费时间的过程,或者至少不是其本身.导致缺乏加速的原因是,对于传递给imap_unordered调用的列表中的约600个数据对象,您将它们中的每个传递给工作进程.再次,multiprocessing.Pool的基础实现可能是导致此问题的原因.

This means that your function pickling should not be a time wasting process, or at least not by itself. What causes lack of speedup is that for ~600 data objects in list that you pass to imap_unordered call, you pass each one of them to a worker process. Once again, underlying implementation of multiprocessing.Pool may be the cause of this issue.

如果深入研究multiprocessing.Pool实现,您将看到使用Queue的两个Threads正在处理父级和所有子级(工人)进程之间的通信.因此,所有进程始终需要函数的参数并不断返回响应,所以最终您会遇到非常繁忙的父进程.这就是为什么要花费大量时间来执行分派"工作以将数据往返于工作进程之间的原因.

If you go deeper into multiprocessing.Pool implementation, you will see that two Threads using Queue are handling communication between parent and all child (worker) processes. Because of this and that all processes constantly require arguments for function and constantly return responses, you end up with very busy parent process. That is why 'a lot' of time is spent doing 'dispatching' work passing data to and from worker processes.

该怎么办?

随时尝试增加作为工作进程中进程的数据对象的数量.在您的示例中,您一个接一个地传递一个数据对象,并且可以确保每个辅助进程在任何时候都恰好处理一个数据对象.为什么不增加传递给工作进程的数据对象的数量呢?这样,您可以使每个进程更忙于处理10、20甚至更多的数据对象.从我所看到的,imap_unordered有一个chunksize参数.默认情况下设置为1.尝试增加它.像这样:

Try to increase number of data objects that are processes in worker process at any time. In your example, you pass one data object after other and you can be sure that each worker process is processing exactly one data object at any time. Why not increase the number of data objects you pass to worker process? That way you can make each process busier with processing 10, 20 or even more data objects. From what I can see, imap_unordered has an chunksize argument. It's set to 1 by default. Try increasing it. Something like this:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(params):
   results = []
   for p in params:
       results.append(process(another_module.global_variable[p]))
   return results

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params, chunksize=10))

    return multiprocessing_result

忠告:

  1. 我看到您创建了params作为索引列表,用于在big_function中选择特定的数据对象.您可以创建代表第一个索引和最后一个索引的元组,并将它们传递给big_function.这可能是增加工作量的一种方式.这是我上面提出的一种替代方法.
  2. 除非您明确希望拥有Pool(processes=os.cpu_count()),否则可以忽略它.默认情况下,它需要CPU内核数.
  1. I see that you create params as a list of indexes, that you use to pick particular data object in big_function. You can create tuples that represent first and last index and pass them to big_function. This can be a way of increasing chunk of work. This is an alternative approach to the one I proposed above.
  2. Unless you explicitly like to have Pool(processes=os.cpu_count()), you can omit it. It by default takes number of CPU cores.

很抱歉回答的长度或任何可能偷偷摸摸的错字.

Sorry for the length of answer or any typo that might have sneaked in.

这篇关于Python多重处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆