利用“写时复制"将数据复制到 Multiprocessing.Pool() 工作进程 [英] Leveraging "Copy-on-Write" to Copy Data to Multiprocessing.Pool() Worker Processes
问题描述
我有一些 multiprocessing
Python 代码,看起来有点像这样:
I have a bit of multiprocessing
Python code that looks a bit like this:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读其他 StackOverflow 答案(例如这个答案)中有关内存如何工作的答案后Python 多处理内存使用情况 我的印象是这不会与我用于多处理的进程数量成比例地使用内存,因为它是写时复制并且我没有修改 my_instance
的任何属性.但是,当我运行 top 时,我确实看到所有进程的内存都很高,它说我的大多数进程都在使用大量内存(这是 OSX 的顶级输出,但我可以在 Linux 上复制).
After reading answers about how memory works in other StackOverflow answers such as this one Python multiprocessing memory usage I was under the impression that this would not use memory in proportion to how many processes I used for multiprocessing, since it is copy-on-write and I have not modified any of the attributes of my_instance
. However, I do see high memory for all processes when I run top it says most of my processes are using a lot of memory (this is top output from OSX, but I can replicate on Linux).
我的问题基本上是,我是否正确解释了我的 MyClass
实例实际上在池中重复?如果是这样,我该如何防止这种情况发生;我应该不使用这样的结构吗?我的目标是减少计算分析的内存使用量.
My question is basically, am I interpreting this correctly in that my instance of MyClass
is actually duplicated across the pool? And if so, how can I prevent this; should I just not use a construction like this? My goal is to reduce memory usage for a computational analysis.
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
推荐答案
发送到 pool.map
(和相关方法)的任何内容实际上并未使用共享的写时复制资源.值为 pickled";(Python 的序列化机制),通过管道发送到工作进程并在那里解压,从头开始重建子进程中的对象.因此,在这种情况下,每个孩子最终都拥有原始数据的写时复制版本(它从未使用过,因为它被告知使用通过 IPC 发送的副本),以及原始数据的个人重新创建在孩子中重建,不共享.
Anything sent to pool.map
(and related methods) isn't actually using shared copy-on-write resources. The values are "pickled" (Python's serialization mechanism), sent over pipes to the worker processes and unpickled there, which reconstructs the object in the child from scratch. Thus, each child in this case ends up with a copy-on-write version of the original data (which it never uses, because it was told to use the copy sent via IPC), and a personal recreation of the original data that was reconstructed in the child and is not shared.
如果您想利用分叉的写时复制优势,则不能通过管道发送数据(或引用数据的对象).您必须将它们存储在可以通过访问孩子自己的全局变量从孩子那里找到的位置.例如:
If you want to take advantage of forking's copy-on-write benefits, you can't send data (or objects referencing the data) over the pipe. You have to store them in a location that can be found from the child by accessing their own globals. So for example:
import os
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递self
,您可以避免进行复制,而只需使用写入时复制映射到子对象的单个全局对象.如果您需要多个对象,您可以在创建池之前将全局 list
或 dict
映射到对象的实例,然后传递可以查找对象作为 pool.map
参数的一部分.然后工作函数使用索引/键(必须被腌制并通过 IPC 发送给子进程)在全局字典(也就是写时复制映射)中查找值(写时复制映射),所以你复制廉价信息来查找子代中昂贵的数据,而无需复制它.
By not passing self
, you avoid making copies, and just use the single global object that was copy-on-write mapped into the child. If you needed more than one object, you might make a global list
or dict
mapping to instances of the object prior to creating the pool, then pass the index or key that can look up the object as part of the argument(s) to pool.map
. The worker function then uses the index/key (which had to be pickled and sent to the child over IPC) to look up the value (copy-on-write mapped) in the global dict (also copy-on-write mapped), so you copy cheap information to lookup expensive data in the child without copying it.
如果对象很小,即使您不写入它们,它们最终也会被复制.CPython 是引用计数的,引用计数出现在公共对象头中并且不断更新,仅仅通过引用对象,即使它是一个逻辑上非变异的引用.所以小对象(以及分配在同一内存页中的所有其他对象)将被写入,因此被复制.对于大型对象(您的一亿个元素 numpy 数组),只要您不写入,其中大部分内容将保持共享状态,因为标头仅占用许多页面之一
在 python 3.8 版中更改:在 macOS 上,spawn start 方法现在是默认方法.请参阅多处理文档.Spawn 没有利用写时复制.
Changed in python version 3.8: On macOS, the spawn start method is now the default. See mulitprocessing doc. Spawn is not leveraging copy-on-write.
这篇关于利用“写时复制"将数据复制到 Multiprocessing.Pool() 工作进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!