使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销 [英] Minimize overhead in Python multiprocessing.Pool with numpy/scipy
问题描述
我花了几个小时尝试不同的方法来并行化我的数字运算代码,但是当我这样做时它只会变得更慢.不幸的是,当我尝试将其简化为下面的示例时,问题就消失了,而且我真的不想在这里发布整个程序.那么问题来了:在这类程序中我应该避免哪些陷阱?
I've spent several hours on different attempts to parallelize my number-crunching code, but it only gets slower when I do so. Unfortunately, the problem disappears when I try to reduce it to the example below and I don't really want to post the whole program here. So the question is: what pitfalls should I avoid in this type of program?
(注:Unutbu 回答后的跟进在底部.)
(Note: follow-up after Unutbu's answer is at the bottom.)
情况如下:
- 它是关于一个模块,它定义了一个包含大量内部数据的
BigData
类.在这个例子中有一个插值函数列表ff
;在实际程序中,还有更多,例如ffA[k]
、ffB[k]
、ffC[k]
. - 计算将被归类为令人尴尬的并行":一次可以在较小的数据块上完成工作.在示例中,它是
do_chunk()
. - 示例中显示的方法在我的实际程序中会导致最差的性能:每个块大约 1 秒(在单个线程中完成时实际计算时间为 0.1 秒左右).因此,对于 n=50,
do_single()
将在 5 秒内运行,do_multi()
将在 55 秒内运行. - 我还尝试通过将
xi
和yi
数组切成连续块并迭代每个块中的所有k
值来拆分工作块.那效果好一点.现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有区别.但当然,我希望看到实际的加速! - 这可能与以下有关:Multiprocessing.Pool 使 Numpy 矩阵乘法变慢一>.然而,在程序的其他地方,我使用了一个多处理池来进行更加孤立的计算:一个函数(不绑定到类)看起来像
def do_chunk(array1, array2, array3)
和对该数组进行 numpy-only 计算.在那里,速度显着提升. - CPU 使用率与预期的并行进程数量成比例(三个线程的 CPU 使用率达到 300%).
- It's about a module that defines a class
BigData
with a lot of internal data. In the example there is one listff
of interpolation functions; in the actual program, there are more, e.g.,ffA[k]
,ffB[k]
,ffC[k]
. - The calculation would be classified as "embarrassingly parallel": the work can be done on smaller chunks of data at a time. In the example, that's
do_chunk()
. - The approach shown in the example would result, in my actual program, in the worst performance: about 1 second per chunk (on top of 0.1 second or so of actual calculation time when done in a single thread). So, for n=50,
do_single()
would run in 5 seconds anddo_multi()
would run in 55 seconds. - I also tried to split up the work by slicing the
xi
andyi
arrays into contiguous blocks and iterating over allk
values in each chunk. That worked a bit better. Now there was no difference in total execution time whether I used 1, 2, 3, or 4 threads. But of course, I want to see an actual speedup! - This may be related: Multiprocessing.Pool makes Numpy matrix multiplication slower. However, elsewhere in the program, I used a multiprocessing pool for calculations that were much more isolated: a function (not bound to a class) that looks something like
def do_chunk(array1, array2, array3)
and does numpy-only calculations on that array. There, there was a significant speed boost. - The CPU usage scales with the number of parallel processes as expected (300% CPU usage for three threads).
#!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
输出:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
时序在具有 2 个内核、4 个线程、运行 64 位 Linux 的 Intel Core i3-3227 CPU 上.对于实际程序,多进程版本(池机制,即使只使用一个内核)比单进程版本慢 10 倍.
Timings are on an Intel Core i3-3227 CPU with 2 cores, 4 threads, running 64-bit Linux. For the actual program, the multi-processing version (pool mechanism, even if using only one core) was a factor 10 slower than the single-process version.
跟进
Unutbu 的回答让我走上了正轨.在实际程序中,self
被腌制成一个 37 到 140 MB 的对象,需要传递给工作进程.更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,这发生在传递给工作进程的每一块工作中.除了pickling和传递大数据对象,Linux中apply_async
的开销非常小;对于一个小函数(添加几个整数参数),每个 apply_async
/get
对只需要 0.2 毫秒.因此,将工作分成非常小的块本身并不是问题.因此,我将所有大数组参数作为索引传输到全局变量.出于 CPU 缓存优化的目的,我保持块大小较小.
Unutbu's answer got me on the right track. In the actual program, self
was pickled into a 37 to 140 MB object that needed to be passed to the worker processes. Worse, Python pickling is very slow; the pickling itself took a few seconds, which happened for each chunk of work passed to the worker processes. Other than pickling and passing big data objects, the overhead of apply_async
in Linux is very small; for a small function (adding a few integer arguments), it takes only 0.2 ms per apply_async
/get
pair. So, splitting up the work in very small chunks is not a problem by itself. So, I transmit all big array arguments as indices to global variables. I keep the chunk size small for the purpose of CPU cache optimization.
全局变量存储在一个全局的dict
中;设置工作池后,条目会立即在父进程中删除.只有 dict
的键被传输到工作进程.酸洗/IPC 唯一的大数据是工人创建的新数据.
The global variables are stored in a global dict
; the entries are immediately deleted in the parent process after the worker pool is set up. Only the keys to the dict
are transmitted to the worker procesess. The only big data for pickling/IPC is the new data that is created by the workers.
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
这里是速度测试的结果(同样,2 个内核,4 个线程),改变了工作进程的数量和块中的内存量(xi
的总字节数,yi
, zi
数组切片).这些数字以每秒百万个结果值"为单位,但这对于比较来说并不重要.1 个进程"的行是对带有完整输入数据的 do_chunk
的直接调用,没有任何子进程.
Here are the results of a speed test (again, 2 cores, 4 threads), varying the number of worker processes and the amount of memory in the chunks (total bytes of the xi
, yi
, zi
array slices). The numbers are in "million result values per second", but that doesn't matter so much for the comparison. The row for "1 process" is a direct call to do_chunk
with the full input data, without any subprocesses.
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
内存中数据大小的影响相当显着.CPU 具有 3 MB 共享 L3 缓存,外加每个内核 256 KB L2 缓存.请注意,计算还需要访问 BigData
对象的几 MB 内部数据.因此,我们从中学到的是,进行这种速度测试很有用.对于这个程序,2个进程最快,4个次之,3个最慢.
The impact of data size in memory is quite significant. The CPU has 3 MB shared L3 cache, plus 256 KB L2 cache per core. Note that the calculation also needs access to several MB of internal data of the BigData
object. Hence, what we learn from this is that it is useful to do this kind of speed test. For this program, 2 processes is fastest, followed by 4, and 3 is the slowest.
推荐答案
尽量减少进程间通信.在 multiprocessing
模块中,所有(单机)进程间通信都通过队列完成.通过队列的对象腌制.所以尝试通过队列发送更少和/或更小的对象.
Try to reduce interprocess communication.
In the multiprocessing
module all (single-computer) interprocess communication done through Queues. Objects passed through a Queue
are pickled. So try to send fewer and/or smaller objects through the Queue.
不要通过队列发送
self
,BigData
的实例.它相当大,并且随着self
中数据量的增加而变得更大:
Do not send
self
, the instance ofBigData
, through the Queue. It is rather big, and gets bigger as the amount the amount of data inself
grows:
In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187
每个时间 pool.apply_async(_do_chunk_wrapper, (self, k, xi, yi))
被调用,self
在主进程中被pickle,在worker进程中被unpickled.这len(pickle.dumps(BigData(N)))
的大小增长 N
增加.
Every
time pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
is called,
self
is pickled in the main process and unpickled in the worker process. The
size of len(pickle.dumps(BigData(N)))
grows a N
increases.
让数据从全局变量中读取.在 Linux 上,您可以利用 Copy-on-Write.正如Jan-Philip Gehrcke 解释的:
Let the data be read from a global variable. On Linux, you can take advantage of Copy-on-Write. As Jan-Philip Gehrcke explains:
fork() 之后,父子节点处于等价状态.将父级的整个内存复制到 RAM 中的另一个位置是愚蠢的.这就是写时复制原则 [进来] 的地方.只要孩子不改变其内存状态,它实际上会访问父母的内存.只有在修改后,相应的位和片才会被复制到孩子的内存空间中.
After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.
因此,您可以避免通过队列传递 BigData
的实例通过简单地将实例定义为全局,bd = BigData(n)
,(正如您已经在做的那样)并在工作进程中引用它的值(例如 _do_chunk_wrapper
).它基本上相当于从对 pool.apply_async
的调用中删除 self
:
Thus, you can avoid passing instances of BigData
through the Queue
by simply defining the instance as a global, bd = BigData(n)
, (as you are already doing) and referring to its values in the worker processes (e.g. _do_chunk_wrapper
). It basically amounts to removing self
from the call to pool.apply_async
:
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
并以全局方式访问 bd
,并对 do_chunk_wrapper
的调用签名进行必要的伴随更改.
and accessing bd
as a global, and making the necessary attendant changes to do_chunk_wrapper
's call signature.
尝试将运行时间更长的函数 func
传递给 pool.apply_async
.如果您有许多对 pool.apply_async
的快速完成调用,那么通过队列传递参数和返回值的开销将成为整个时间的重要组成部分.相反,如果您减少对 pool.apply_async
的调用,并在返回结果之前让每个 func
做更多的工作,那么进程间通信将成为总时间的一小部分.
Try to pass longer-running functions, func
, to pool.apply_async
.
If you have many quickly-completing calls to pool.apply_async
then the overhead of passing arguments and return values through the Queue becomes a significant part of the overall time. If instead you make fewer calls to pool.apply_async
and give each func
more work to do before returning a result, then interprocess communication becomes a smaller fraction of the overall time.
下面,我修改了 _do_chunk_wrapper
以接受 k_start
和 k_end
参数,以便每次调用 pool.apply_async
将在返回结果之前计算 k
的许多值的总和.
Below, I modified _do_chunk_wrapper
to accept k_start
and k_end
arguments, so that each call to pool.apply_async
would compute the sum for many values of k
before returning a result.
import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = interpolate.RectBivariateSpline(
np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
n = self.n
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
for k in range(k_start, k_end))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = mp.Pool(numproc)
stopwatch('\nPool setup')
ks = list(map(int, np.linspace(0, self.n, numproc+1)))
for i in range(len(ks)-1):
k_start, k_end = ks[i:i+2]
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
total = 0.0
for k, p in enumerate(procs):
total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
print(total)
stopwatch('Jobs done')
pool.close()
pool.join()
return total
def do_single(self, xi, yi):
total = 0.0
for k in range(self.n):
total += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return total
def _do_chunk_wrapper(k_start, k_end, xi, yi):
return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
收益
Initialized: 0.15 seconds
Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
与原始代码相比:
Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
这篇关于使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!