如何使用多处理模块迭代列表并将其与字典中的键进行匹配? [英] How to use multiprocessing module to iterate a list and match it with a key in dictionary?
问题描述
我有一个使用以下代码从CSV文件创建的名为master_lst
的列表
I have a list named master_lst
created from a CSV file using the following code
infile= open(sys.argv[1], "r")
lines = infile.readlines()[1:]
master_lst = ["read"]
for line in lines:
line= line.strip().split(',')
fourth_field = line [3]
master_lst.append(fourth_field)
此主列表具有唯一的序列集.现在,我必须循环播放30个折叠的FASTA文件,以计算主列表中这些序列中每个序列的出现次数.这30个文件的文件格式如下:
This master list has the unique set of sequences. Now I have to loop 30 collapsed FASTA files to count the number of occurrences of each of these sequences in the master list. The file format of the 30 files is as follow:
>AAAAAAAAAAAAAAA
7451
>AAAAAAAAAAAAAAAA
4133
>AAAAAAAAAAAAAAAAA
2783
为了计算出现次数,我循环浏览了30个文件中的每个文件,并创建了一个字典,将序列作为键,并将出现次数作为值.然后,我迭代master_lst
的每个元素,并将其与上一步中创建的字典中的键进行匹配.如果存在匹配项,则将键的值附加到新列表(ind_lst
).如果不是,我将0附加到ind_lst
.的代码如下:
For counting the number of occurrences, I looped through each of the 30 file and created a dictionary with sequences as key and number of occurrences as values. Then I iterated each element of the master_lst
and matched it with the key in the dictionary created from the previous step. If there is a match, I appended the value of the key to a new list (ind_lst
). If not I appended 0 to the ind_lst
. The code for that is as follow:
for file in files:
ind_lst = []
if file.endswith('.fa'):
first = file.split(".")
first_field = first [0]
ind_lst.append(first_field)
fasta= open(file)
individual_dict= {}
for line in fasta:
line= line.strip()
if line == '':
continue
if line.startswith('>'):
header = line.lstrip('>')
individual_dict[header]= ''
else:
individual_dict[header] += line
for key in master_lst[1:]:
a = 0
if key in individual_dict.keys():
a = individual_dict[key]
else:
a = 0
ind_lst.append(a)
然后使用下面说明的代码将master_lst
写入CSV文件和ind_lst
:
then I write the master_lst
to a CSV file and ind_lst
using the code explained here: How to append a new list to an existing CSV file?
最终输出应如下所示:
Read file1 file2 so on until file 30
AAAAAAAAAAAAAAA 7451 4456
AAAAAAAAAAAAAAAA 4133 3624
AAAAAAAAAAAAAAAAA 2783 7012
当我使用较小的master_lst
时,此代码可以很好地工作.但是,当master_lst
的大小增加时,执行时间将增加太多.我正在使用的master_lst
具有35,718,501个序列(元素).当我对50个序列进行子集并运行代码时,脚本需要2个小时来执行.因此,对于35,718,501个序列,将需要永远完成.
This codes work perfectly fine when I use a smaller master_lst
. But when the size of the master_lst
increases then the execution time increases too much. The master_lst
I am working with right now has 35,718,501 sequences(elements). When I subset 50 sequences and run the code, the script takes 2 hours to execute. So for 35,718,501 sequences it will take forever to complete.
现在我不知道如何加快脚本速度.我不太确定是否可以对该脚本进行一些改进以使其在较短的时间内执行.我在具有16个CPU内核的Linux服务器上运行脚本.当我使用命令top时,我可以看到该脚本仅使用一个CPU.但是我不是python专家,我也不知道如何使用多处理模块使它在所有可用的CPU内核上运行.我检查了以下网页: 学习Python的多处理模块 .
Now I don't know how to speed up the script. I am not quite sure if there could be some improvements that can be made to this script to make it execute in a shorter time. I am running my script on a Linux server which has 16 CPU cores. When I use the command top, I could see that the script uses only one CPU. But I am not a expert in python and I don't know how to make it run on all available CPU cores using multiprocessing module. I checked this webpage: Learning Python's Multiprocessing Module.
但是,我不确定def
和if __name__ == '__main__':
应该包含什么.我也不太确定我应该将哪些参数传递给函数.当我尝试从道格拉斯(Douglas)获得第一个代码时,没有传递任何如下参数时,我得到一个错误:
But, I wasn't quite sure what should come under def
and if __name__ == '__main__':
. I am also not quite sure what arguments should I pass to the function. I was getting an error when I try the first code from Douglas, without passing any arguments as follow:
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
过去的几天我一直在努力,但未能成功生成所需的输出.如果有人可以建议一个可以快速运行的替代代码,或者有人可以建议如何在多个CPU上运行该代码,那将是非常棒的.解决该问题的任何帮助将不胜感激.
I have been working this for the last few days and I haven't been successful in generating my desired output. If anyone can suggest an alternative code that could run fast or if anyone could suggest how to run this code on multiple CPUs, that would be awesome. Any help to resolve this issue would be much appreciated.
推荐答案
这是多处理版本.它使用的代码与您在代码中使用的方法略有不同,从而无需创建ind_lst
.
Here's a multiprocessing version. It uses a slightly different approach than you do in your code which does away with the need for creating the ind_lst
.
区别的本质在于,它首先产生所需数据的转置,然后将其转置为所需结果.
The essence of the difference is that it first produces a transpose of the desired data, and then transpose that into the desired result.
换句话说,不是直接创建它:
In other words, instead of creating this directly:
Read,file1,file2
AAAAAAAAAAAAAAA,7451,4456
AAAAAAAAAAAAAAAA,4133,3624
AAAAAAAAAAAAAAAAA,2783,7012
它首先产生:
Read,AAAAAAAAAAAAAAA,AAAAAAAAAAAAAAAA,AAAAAAAAAAAAAAAAA
file1,7451,4133,2783
file2,4456,3624,7012
...然后将其与内置的zip()
函数进行转置以获得所需的格式.
...and then transposes that with the built-in zip()
function to obtain the desired format.
除了不需要创建ind_lst
之外,它还允许每个文件创建一行数据,而不是创建一列(这更容易,并且可以更轻松地完成工作).
Besides not needing to create the ind_lst
, it also allows the creation of one row of data per file rather than one column of it (which is easier and can be done more efficiently with less effort).
代码如下:
from __future__ import print_function
import csv
from functools import partial
from glob import glob
from itertools import izip # Python 2
import operator
import os
from multiprocessing import cpu_count, Pool, Queue
import sys
def get_master_list(filename):
with open(filename, "rb") as csvfile:
reader = csv.reader(csvfile)
next(reader) # ignore first row
sequence_getter = operator.itemgetter(3) # retrieves fourth column of each row
return map(sequence_getter, reader)
def process_fa_file(master_list, filename):
fa_dict = {}
with open(filename) as fa_file:
for line in fa_file:
if line and line[0] != '>':
fa_dict[sequence] = int(line)
elif line:
sequence = line[1:-1]
get = fa_dict.get # local var to expedite access
basename = os.path.basename(os.path.splitext(filename)[0])
return [basename] + [get(key, 0) for key in master_list]
def process_fa_files(master_list, filenames):
pool = Pool(processes=4) # "processes" is the number of worker processes to
# use. If processes is None then the number returned
# by cpu_count() is used.
# Only one argument can be passed to the target function using Pool.map(),
# so create a partial to pass first argument, which doesn't vary.
results = pool.map(partial(process_fa_file, master_list), filenames)
header_row = ['Read'] + master_list
return [header_row] + results
if __name__ == '__main__':
master_list = get_master_list('master_list.csv')
fa_files_dir = '.' # current directory
filenames = glob(os.path.join(fa_files_dir, '*.fa'))
data = process_fa_files(master_list, filenames)
rows = zip(*data) # transpose
with open('output.csv', 'wb') as outfile:
writer = csv.writer(outfile)
writer.writerows(rows)
# show data written to file
for row in rows:
print(','.join(map(str, row)))
这篇关于如何使用多处理模块迭代列表并将其与字典中的键进行匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!