使用Pool.map()进行多处理时如何解决内存问题? [英] How to solve memory issues problems while multiprocessing using Pool.map()?
问题描述
我已经将程序编写如下:
- 将大型文本文件读取为
pandas dataframe
- 然后
groupby
使用特定的列值来拆分数据并存储为数据帧列表. - 然后将数据通过管道传输到
multiprocess Pool.map()
以并行处理每个数据帧.
一切都很好,该程序在我的小型测试数据集上运行良好.但是,当我处理大量数据(大约14 GB)时,内存消耗呈指数增长,然后冻结计算机或被杀死(在HPC群集中).
我添加了代码,以在数据/变量不可用时立即清除内存.一旦完成,我也将关闭游泳池.仍然有14 GB的输入,我只期望2 * 14 GB的内存负担,但是看来情况还在继续.我还尝试使用chunkSize and maxTaskPerChild, etc
进行调整,但在测试和大型文件的优化方面都没有看到任何差异.
我认为,当我启动multiprocessing
时,需要在此代码位置对此代码进行改进.
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
但是,我要发布整个代码.
测试示例:我创建了一个最大250 mb的测试文件("genome_matrix_final-chr1234-1mb.txt")并运行了该程序.当我检查系统监视器时,可以看到内存消耗增加了大约6 GB.我不太清楚为什么250 mb文件加上一些输出会占用这么多的内存空间.我已经通过下拉框共享了该文件,如果它有助于查看实际问题. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl
有人可以建议我如何摆脱这个问题吗?
我的python脚本:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02
#genome_matrix_file = "genome_matrix_final.txt" # large file
def main():
with open("genome_matrix_header.txt") as header:
header = header.read().rstrip('\n').split('\t')
print()
time01 = time.time()
print('starting time: ', time01)
'''load the genome matrix file onto pandas as dataframe.
This makes is more easy for multiprocessing'''
gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)
# now, group the dataframe by chromosome/contig - so it can be multiprocessed
gen_matrix_df = gen_matrix_df.groupby('CHROM')
# store the splitted dataframes as list of key, values(pandas dataframe) pairs
# this list of dataframe will be used while multiprocessing
gen_matrix_df_list = collections.OrderedDict()
for chr_, data in gen_matrix_df:
gen_matrix_df_list[chr_] = data
# clear memory
del gen_matrix_df
'''Now, pipe each dataframe from the list using map.Pool() '''
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
del gen_matrix_df_list # clear memory
p.close()
p.join()
# concat the results from pool.map() and write it to a file
result_merged = pd.concat(result)
del result # clear memory
pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)
print()
print('completed all process in "%s" sec. ' % (time.time() - time01))
print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
print()
'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):
print()
time02 = time.time()
# index position of the samples in genome matrix file
sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
{'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
{'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
{'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
{'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
{'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
{'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
{'8a': 32, '8b': 17}]
# sample index stored as ordered dictionary
sample_idx_ord_list = []
for ids in sample_idx:
ids = collections.OrderedDict(sorted(ids.items()))
sample_idx_ord_list.append(ids)
# for haplotype file
header = ['contig', 'pos', 'ref', 'alt']
# adding some suffixes "PI" to available sample names
for item in sample_idx_ord_list:
ks_update = ''
for ks in item.keys():
ks_update += ks
header.append(ks_update+'_PI')
header.append(ks_update+'_PG_al')
#final variable store the haplotype data
# write the header lines first
haplotype_output = '\t'.join(header) + '\n'
# to store the value of parsed the line and update the "PI", "PG" value for each sample
updated_line = ''
# read the piped in data back to text like file
matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)
matrix_df = matrix_df.rstrip('\n').split('\n')
for line in matrix_df:
if line.startswith('CHROM'):
continue
line_split = line.split('\t')
chr_ = line_split[0]
ref = line_split[2]
alt = list(set(line_split[3:]))
# remove the alleles "N" missing and "ref" from the alt-alleles
alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))
# if no alt alleles are found, just continue
# - i.e : don't write that line in output file
if len(alt_up) == 0:
continue
#print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
#so, we have data for CHR, POS, REF, ALT so far
# now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
sample_data_for_vcf = []
for ids in sample_idx_ord_list:
sample_data = []
for key, val in ids.items():
sample_value = line_split[val]
sample_data.append(sample_value)
# now, update the phased state for each sample
# also replacing the missing allele i.e "N" and "-" with ref-allele
sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
sample_data_for_vcf.append(str(chr_))
sample_data_for_vcf.append(sample_data)
# add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
# and .. write it to final haplotype file
sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
'\t' + sample_data_for_vcf + '\n'
haplotype_output += updated_line
del matrix_df # clear memory
print('completed haplotype preparation for chromosome/contig "%s" '
'in "%s" sec. ' %(chr_, time.time()-time02))
print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))
# return the data back to the pool
return pd.read_csv(io.StringIO(haplotype_output), sep='\t')
''' to monitor memory '''
def current_mem_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.
if __name__ == '__main__':
main()
赏金猎人更新:
我已经使用Pool.map()
实现了多处理,但是代码造成了很大的内存负担(输入测试文件〜300 mb,但是内存负担约为6 GB).我只期望最大内存为3 * 300 mb.
- 有人可以解释一下,是什么原因导致了这么小的文件和如此小的长度计算的如此大的内存需求.
- 此外,我正在尝试回答并使用它来改善大型程序中的多进程.因此,添加任何不会过多改变计算部分(CPU绑定进程)的结构的方法,模块应该没问题.
- 我提供了两个测试文件以用于测试目的,以便与代码一起使用.
- 附件代码为完整代码,因此复制粘贴时应能按预期工作.任何更改都应仅用于改善多处理步骤中的优化.
先决条件
-
在Python中(以下我将使用Python 3.6.5的64位构建),所有对象都是对象.这有它的开销,并且使用
getsizeof
对象的大小(以字节为单位):>>> import sys >>> sys.getsizeof(42) 28 >>> sys.getsizeof('T') 50
- 使用fork系统调用(默认值为* nix,请参见
multiprocessing.get_start_method()
)创建子进程时,不会复制父级的物理内存,并且 PSS (比例集大小)是更合适的指标,用于评估分叉的内存使用情况应用.这是页面上的示例:
- 进程A具有50 KiB的未共享内存
- 进程B有300 KiB的未共享内存
- 进程A和进程B都具有100 KiB的相同共享内存区域
由于PSS定义为一个进程的未共享内存和与其他进程共享的内存比例之和,因此这两个进程的PSS如下:
- 过程A的PSS = 50 KiB +(100 KiB/2)= 100 KiB
- 流程B的PSS = 300 KiB +(100 KiB/2)= 350 KiB
数据帧
不让我们单独看看您的DataFrame
. memory_profiler
将为我们提供帮助.
justpd.py
#!/usr/bin/env python3
import pandas as pd
from memory_profiler import profile
@profile
def main():
with open('genome_matrix_header.txt') as header:
header = header.read().rstrip('\n').split('\t')
gen_matrix_df = pd.read_csv(
'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
gen_matrix_df.info()
gen_matrix_df.info(memory_usage='deep')
if __name__ == '__main__':
main()
现在让我们使用探查器:
mprof run justpd.py
mprof plot
我们可以看到该图:
和逐行跟踪:
Line # Mem usage Increment Line Contents
================================================
6 54.3 MiB 54.3 MiB @profile
7 def main():
8 54.3 MiB 0.0 MiB with open('genome_matrix_header.txt') as header:
9 54.3 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
10
11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
12
13 2072.0 MiB 0.0 MiB gen_matrix_df.info()
14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage='deep')
我们可以看到,数据帧在构建时占用〜2 GiB的峰值,在〜3 GiB处.更有趣的是 info
.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB
但是info(memory_usage='deep')
("deep"表示通过查询object
dtype
s来对数据进行深入的内省,请参见下文)给出:
memory usage: 7.9 GB
嗯?!从过程的外部看,我们可以确保memory_profiler
的数字正确. sys.getsizeof
也会为框架显示相同的值(很可能是由于自定义__sizeof__
),使用它来估计分配的gc.get_objects()
的其他工具也会显示相同的值,例如 pympler
.
# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()
赠予:
types | # objects | total size
================================================== | =========== | ============
<class 'pandas.core.series.Series | 34 | 7.93 GB
<class 'list | 7839 | 732.38 KB
<class 'str | 7741 | 550.10 KB
<class 'int | 1810 | 49.66 KB
<class 'dict | 38 | 7.43 KB
<class 'pandas.core.internals.SingleBlockManager | 34 | 3.98 KB
<class 'numpy.ndarray | 34 | 3.19 KB
那么这些7.93 GiB是从哪里来的呢?让我们尝试解释一下.我们有4M行和34列,这为我们提供了134M值.它们是int64
或object
(这是64位指针;请参见使用大数据的熊猫进行详细说明).因此,我们仅对于数据帧中的值具有134 * 10 ** 6 * 8 / 2 ** 20
〜1022 MiB.剩下的〜6.93 GiB呢?
字符串实习
要了解其行为,必须知道Python确实进行了字符串实习.有两篇好文章(一个, PEP 393 在Python 3.3中,C结构已经更改,但是想法是相同的.基本上,每个看起来像标识符的短字符串都将由Python缓存在内部字典中,并且引用将指向相同的Python对象.换句话说,我们可以说它的行为像一个单例.我上面提到的文章解释了内存配置文件的显着改进和性能改进.我们可以检查字符串是否使用 interned
PyASCIIObject
的字段:
import ctypes
class PyASCIIObject(ctypes.Structure):
_fields_ = [
('ob_refcnt', ctypes.c_size_t),
('ob_type', ctypes.py_object),
('length', ctypes.c_ssize_t),
('hash', ctypes.c_int64),
('state', ctypes.c_int32),
('wstr', ctypes.c_wchar_p)
]
然后:
>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0
使用两个字符串,我们还可以进行身份比较(在CPython中通过内存比较来解决).
>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True
因此,关于object
dtype
,数据帧最多分配20个字符串(每个氨基酸一个).不过,值得注意的是,Pandas建议使用类别类型进行枚举. /p>
熊猫的记忆
因此,我们可以像这样解释7.93 GiB的天真估算值:
>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953
请注意,str_size
是58个字节,而不是上面我们看到的1个字符字面量的50个字节.这是因为PEP 393定义了紧凑型和非紧凑型字符串.您可以使用sys.getsizeof(gen_matrix_df.REF[0])
进行检查.
实际内存消耗应为gen_matrix_df.info()
报告的〜1 GiB,是两倍.我们可以假设它与Pandas或NumPy完成的内存(预)分配有关.以下实验表明并非没有道理(多次运行显示了保存图片):
Line # Mem usage Increment Line Contents
================================================
8 53.1 MiB 53.1 MiB @profile
9 def main():
10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header:
11 53.1 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
12
13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
我想用有关设计问题和未来的新文章作为引言来结束本节Pandas2 的原始作者.
熊猫的经验法则:RAM是数据集大小的5至10倍
进程树
最后,让我们进入存储池,看看是否可以利用写时复制.我们将使用 smemstat
(可从Ubuntu存储库获得) )以估计进程组的内存共享,并 glances
记下系统范围的可用内存.两者都可以编写JSON.
我们将使用Pool(2)
运行原始脚本.我们需要3个终端窗口.
-
smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
-
glances -t 1 --export-json glances.json
-
mprof run -M script.py
然后mprof plot
产生:
总和图(mprof run --nopython --include-children ./script.py
)如下:
请注意,上面的两个图表显示了RSS.假设是由于写时复制不能反映实际的内存使用情况.现在,我们有两个来自smemstat
和glances
的JSON文件.我将使用以下脚本将JSON文件转换为CSV.
#!/usr/bin/env python3
import csv
import sys
import json
def smemstat():
with open('smemstat.json') as f:
smem = json.load(f)
rows = []
fieldnames = set()
for s in smem['smemstat']['periodic-samples']:
row = {}
for ps in s['smem-per-process']:
if 'script.py' in ps['command']:
for k in ('uss', 'pss', 'rss'):
row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20
# smemstat produces empty samples, backfill from previous
if rows:
for k, v in rows[-1].items():
row.setdefault(k, v)
rows.append(row)
fieldnames.update(row.keys())
with open('smemstat.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
dw.writeheader()
list(map(dw.writerow, rows))
def glances():
rows = []
fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
'free', 'mem_critical', 'inactive', 'shared', 'history_size',
'mem_warning', 'total', 'active', 'buffers']
with open('glances.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=fieldnames)
dw.writeheader()
with open('glances.json') as f:
for l in f:
d = json.loads(l)
dw.writerow(d['mem'])
if __name__ == '__main__':
globals()[sys.argv[1]]()
首先让我们看一下free
内存.
第一和最小值之间的差异为〜4.15 GiB. PSS的数字如下所示:
总和:
因此,我们可以看到由于写时复制,实际的内存消耗为〜4.15 GiB.但是我们仍在对数据进行序列化,以通过Pool.map
将其发送到辅助进程.我们也可以在这里利用写时复制吗?
共享数据
要使用写时复制,我们需要全局访问list(gen_matrix_df_list.values())
,以便分叉后的工作人员仍然可以读取它.
-
让我们在
main
中的del gen_matrix_df
之后修改代码,如下所示:... global global_gen_matrix_df_values global_gen_matrix_df_values = list(gen_matrix_df_list.values()) del gen_matrix_df_list p = Pool(2) result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values))) ...
- 删除随后要发布的
del gen_matrix_df_list
. -
并修改
matrix_to_vcf
的第一行,例如:def matrix_to_vcf(i): matrix_df = global_gen_matrix_df_values[i]
现在让我们重新运行它.可用内存:
进程树:
及其总和:
因此,我们的实际内存使用量最多约为2.9 GiB(在构建数据帧时主进程达到了峰值),而写时复制有所帮助!
作为附带说明,有一种所谓的读取时复制"功能,即Python参考循环垃圾收集器的行为,gc.freeze" rel ="noreferrer"> issue31558 ).但是gc.disable()
在这种特殊情况下没有影响.
更新
写时复制无副本数据共享的另一种方法是使用示例实现来自 Python中的高性能数据处理演讲.然后,棘手的部分将使熊猫使用mmaped Numpy数组.
I have written the program (below) to:
- read a huge text file as
pandas dataframe
- then
groupby
using a specific column value to split the data and store as list of dataframes. - then pipe the data to
multiprocess Pool.map()
to process each dataframe in parallel.
Everything is fine, the program works well on my small test dataset. But, when I pipe in my large data (about 14 GB), the memory consumption exponentially increases and then freezes the computer or gets killed (in HPC cluster).
I have added codes to clear the memory as soon as the data/variable isn't useful. I am also closing the pool as soon as it is done. Still with 14 GB input I was only expecting 2*14 GB memory burden, but it seems like lot is going on. I also tried to tweak using chunkSize and maxTaskPerChild, etc
but I am not seeing any difference in optimization in both test vs. large file.
I think improvements to this code is/are required at this code position, when I start multiprocessing
.
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
but, I am posting the whole code.
Test example: I created a test file ("genome_matrix_final-chr1234-1mb.txt") of upto 250 mb and ran the program. When I check the system monitor I can see that memory consumption increased by about 6 GB. I am not so clear why so much memory space is taken by 250 mb file plus some outputs. I have shared that file via drop box if it helps in seeing the real problem. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
Can someone suggest, How I can get rid of the problem?
My python script:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02
#genome_matrix_file = "genome_matrix_final.txt" # large file
def main():
with open("genome_matrix_header.txt") as header:
header = header.read().rstrip('\n').split('\t')
print()
time01 = time.time()
print('starting time: ', time01)
'''load the genome matrix file onto pandas as dataframe.
This makes is more easy for multiprocessing'''
gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)
# now, group the dataframe by chromosome/contig - so it can be multiprocessed
gen_matrix_df = gen_matrix_df.groupby('CHROM')
# store the splitted dataframes as list of key, values(pandas dataframe) pairs
# this list of dataframe will be used while multiprocessing
gen_matrix_df_list = collections.OrderedDict()
for chr_, data in gen_matrix_df:
gen_matrix_df_list[chr_] = data
# clear memory
del gen_matrix_df
'''Now, pipe each dataframe from the list using map.Pool() '''
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
del gen_matrix_df_list # clear memory
p.close()
p.join()
# concat the results from pool.map() and write it to a file
result_merged = pd.concat(result)
del result # clear memory
pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)
print()
print('completed all process in "%s" sec. ' % (time.time() - time01))
print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
print()
'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):
print()
time02 = time.time()
# index position of the samples in genome matrix file
sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
{'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
{'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
{'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
{'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
{'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
{'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
{'8a': 32, '8b': 17}]
# sample index stored as ordered dictionary
sample_idx_ord_list = []
for ids in sample_idx:
ids = collections.OrderedDict(sorted(ids.items()))
sample_idx_ord_list.append(ids)
# for haplotype file
header = ['contig', 'pos', 'ref', 'alt']
# adding some suffixes "PI" to available sample names
for item in sample_idx_ord_list:
ks_update = ''
for ks in item.keys():
ks_update += ks
header.append(ks_update+'_PI')
header.append(ks_update+'_PG_al')
#final variable store the haplotype data
# write the header lines first
haplotype_output = '\t'.join(header) + '\n'
# to store the value of parsed the line and update the "PI", "PG" value for each sample
updated_line = ''
# read the piped in data back to text like file
matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)
matrix_df = matrix_df.rstrip('\n').split('\n')
for line in matrix_df:
if line.startswith('CHROM'):
continue
line_split = line.split('\t')
chr_ = line_split[0]
ref = line_split[2]
alt = list(set(line_split[3:]))
# remove the alleles "N" missing and "ref" from the alt-alleles
alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))
# if no alt alleles are found, just continue
# - i.e : don't write that line in output file
if len(alt_up) == 0:
continue
#print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
#so, we have data for CHR, POS, REF, ALT so far
# now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
sample_data_for_vcf = []
for ids in sample_idx_ord_list:
sample_data = []
for key, val in ids.items():
sample_value = line_split[val]
sample_data.append(sample_value)
# now, update the phased state for each sample
# also replacing the missing allele i.e "N" and "-" with ref-allele
sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
sample_data_for_vcf.append(str(chr_))
sample_data_for_vcf.append(sample_data)
# add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
# and .. write it to final haplotype file
sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
'\t' + sample_data_for_vcf + '\n'
haplotype_output += updated_line
del matrix_df # clear memory
print('completed haplotype preparation for chromosome/contig "%s" '
'in "%s" sec. ' %(chr_, time.time()-time02))
print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))
# return the data back to the pool
return pd.read_csv(io.StringIO(haplotype_output), sep='\t')
''' to monitor memory '''
def current_mem_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.
if __name__ == '__main__':
main()
Update for bounty hunters:
I have achieved multiprocessing using Pool.map()
but the code is causing a big memory burden (input test file ~ 300 mb, but memory burden is about 6 GB). I was only expecting 3*300 mb memory burden at max.
- Can somebody explain, What is causing such a huge memory requirement for such a small file and for such small length computation.
- Also, i am trying to take the answer and use that to improve multiprocess in my large program. So, addition of any method, module that doesn't change the structure of computation part (CPU bound process) too much should be fine.
- I have included two test files for the test purposes to play with the code.
- The attached code is full code so it should work as intended as it is when copied-pasted. Any changes should be used only to improve optimization in multiprocessing steps.
Prerequisite
In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with
getsizeof
we can see exactly the size of an object in bytes:>>> import sys >>> sys.getsizeof(42) 28 >>> sys.getsizeof('T') 50
- When fork system call used (default on *nix, see
multiprocessing.get_start_method()
) to create a child process, parent's physical memory is not copied and copy-on-write technique is used. - Fork child process will still report full RSS (resident set size) of the parent process. Because of this fact, PSS (proportional set size) is more appropriate metric to estimate memory usage of forking application. Here's an example from the page:
- Process A has 50 KiB of unshared memory
- Process B has 300 KiB of unshared memory
- Both process A and process B have 100 KiB of the same shared memory region
Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:
- PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
- PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB
The data frame
Not let's look at your DataFrame
alone. memory_profiler
will help us.
justpd.py
#!/usr/bin/env python3
import pandas as pd
from memory_profiler import profile
@profile
def main():
with open('genome_matrix_header.txt') as header:
header = header.read().rstrip('\n').split('\t')
gen_matrix_df = pd.read_csv(
'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
gen_matrix_df.info()
gen_matrix_df.info(memory_usage='deep')
if __name__ == '__main__':
main()
Now let's use the profiler:
mprof run justpd.py
mprof plot
We can see the plot:
and line-by-line trace:
Line # Mem usage Increment Line Contents
================================================
6 54.3 MiB 54.3 MiB @profile
7 def main():
8 54.3 MiB 0.0 MiB with open('genome_matrix_header.txt') as header:
9 54.3 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
10
11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
12
13 2072.0 MiB 0.0 MiB gen_matrix_df.info()
14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage='deep')
We can see that the data frame takes ~2 GiB with peak at ~3 GiB while it's being built. What's more interesting is the output of info
.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB
But info(memory_usage='deep')
("deep" means introspection of the data deeply by interrogating object
dtype
s, see below) gives:
memory usage: 7.9 GB
Huh?! Looking outside of the process we can make sure that memory_profiler
's figures are correct. sys.getsizeof
also shows the same value for the frame (most probably because of custom __sizeof__
) and so will other tools that use it to estimate allocated gc.get_objects()
, e.g. pympler
.
# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()
Gives:
types | # objects | total size
================================================== | =========== | ============
<class 'pandas.core.series.Series | 34 | 7.93 GB
<class 'list | 7839 | 732.38 KB
<class 'str | 7741 | 550.10 KB
<class 'int | 1810 | 49.66 KB
<class 'dict | 38 | 7.43 KB
<class 'pandas.core.internals.SingleBlockManager | 34 | 3.98 KB
<class 'numpy.ndarray | 34 | 3.19 KB
So where do these 7.93 GiB come from? Let's try to explain this. We have 4M rows and 34 columns, which gives us 134M values. They are either int64
or object
(which is a 64-bit pointer; see using pandas with large data for detailed explanation). Thus we have 134 * 10 ** 6 * 8 / 2 ** 20
~1022 MiB only for values in the data frame. What about the remaining ~ 6.93 GiB?
String interning
To understand the behaviour it's necessary to know that Python does string interning. There are two good articles (one, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393 in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using interned
field of PyASCIIObject
:
import ctypes
class PyASCIIObject(ctypes.Structure):
_fields_ = [
('ob_refcnt', ctypes.c_size_t),
('ob_type', ctypes.py_object),
('length', ctypes.c_ssize_t),
('hash', ctypes.c_int64),
('state', ctypes.c_int32),
('wstr', ctypes.c_wchar_p)
]
Then:
>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0
With two strings we can also do identity comparison (addressed in memory comparison in case of CPython).
>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True
Because of that fact, in regard to object
dtype
, the data frame allocates at most 20 strings (one per amino acids). Though, it's worth noting that Pandas recommends categorical types for enumerations.
Pandas memory
Thus we can explain the naive estimate of 7.93 GiB like:
>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953
Note that str_size
is 58 bytes, not 50 as we've seen above for 1-character literal. It's because PEP 393 defines compact and non-compact strings. You can check it with sys.getsizeof(gen_matrix_df.REF[0])
.
Actual memory consumption should be ~1 GiB as it's reported by gen_matrix_df.info()
, it's twice as much. We can assume it has something to do with memory (pre)allocation done by Pandas or NumPy. The following experiment shows that it's not without reason (multiple runs show the save picture):
Line # Mem usage Increment Line Contents
================================================
8 53.1 MiB 53.1 MiB @profile
9 def main():
10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header:
11 53.1 MiB 0.0 MiB header = header.read().rstrip('\n').split('\t')
12
13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
...
47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
I want to finish this section by a quote from fresh article about design issues and future Pandas2 by original author of Pandas.
pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset
Process tree
Let's come to the pool, finally, and see if can make use of copy-on-write. We'll use smemstat
(available form an Ubuntu repository) to estimate process group memory sharing and glances
to write down system-wide free memory. Both can write JSON.
We'll run original script with Pool(2)
. We'll need 3 terminal windows.
smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
glances -t 1 --export-json glances.json
mprof run -M script.py
Then mprof plot
produces:
The sum chart (mprof run --nopython --include-children ./script.py
) looks like:
Note that two charts above show RSS. The hypothesis is that because of copy-on-write it's doesn't reflect actual memory usage. Now we have two JSON files from smemstat
and glances
. I'll the following script to covert the JSON files to CSV.
#!/usr/bin/env python3
import csv
import sys
import json
def smemstat():
with open('smemstat.json') as f:
smem = json.load(f)
rows = []
fieldnames = set()
for s in smem['smemstat']['periodic-samples']:
row = {}
for ps in s['smem-per-process']:
if 'script.py' in ps['command']:
for k in ('uss', 'pss', 'rss'):
row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20
# smemstat produces empty samples, backfill from previous
if rows:
for k, v in rows[-1].items():
row.setdefault(k, v)
rows.append(row)
fieldnames.update(row.keys())
with open('smemstat.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
dw.writeheader()
list(map(dw.writerow, rows))
def glances():
rows = []
fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
'free', 'mem_critical', 'inactive', 'shared', 'history_size',
'mem_warning', 'total', 'active', 'buffers']
with open('glances.csv', 'w') as out:
dw = csv.DictWriter(out, fieldnames=fieldnames)
dw.writeheader()
with open('glances.json') as f:
for l in f:
d = json.loads(l)
dw.writerow(d['mem'])
if __name__ == '__main__':
globals()[sys.argv[1]]()
First let's look at free
memory.
The difference between first and minimum is ~4.15 GiB. And here is how PSS figures look like:
And the sum:
Thus we can see that because of copy-on-write actual memory consumption is ~4.15 GiB. But we're still serialising data to send it to worker processes via Pool.map
. Can we leverage copy-on-write here as well?
Shared data
To use copy-on-write we need to have the list(gen_matrix_df_list.values())
be accessible globally so the worker after fork can still read it.
Let's modify code after
del gen_matrix_df
inmain
like the following:... global global_gen_matrix_df_values global_gen_matrix_df_values = list(gen_matrix_df_list.values()) del gen_matrix_df_list p = Pool(2) result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values))) ...
- Remove
del gen_matrix_df_list
that goes later. And modify first lines of
matrix_to_vcf
like:def matrix_to_vcf(i): matrix_df = global_gen_matrix_df_values[i]
Now let's re-run it. Free memory:
Process tree:
And its sum:
Thus we're at maximum of ~2.9 GiB of actual memory usage (the peak main process has while building the data frame) and copy-on-write has helped!
As a side note, there's so called copy-on-read, the behaviour of Python's reference cycle garbage collector, described in Instagram Engineering (which led to gc.freeze
in issue31558). But gc.disable()
doesn't have an impact in this particular case.
Update
An alternative to copy-on-write copy-less data sharing can be delegating it to the kernel from the beginning by using numpy.memmap
. Here's an example implementation from High Performance Data Processing in Python talk. The tricky part is then to make Pandas to use the mmaped Numpy array.
这篇关于使用Pool.map()进行多处理时如何解决内存问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!