如何使用多处理模块迭代列表并将其与字典中的键进行匹配? [英] How to use multiprocessing module to iterate a list and match it with a key in dictionary?

查看:66
本文介绍了如何使用多处理模块迭代列表并将其与字典中的键进行匹配?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用以下代码从CSV文件创建的名为master_lst的列表

I have a list named master_lst created from a CSV file using the following code

infile= open(sys.argv[1], "r")
lines = infile.readlines()[1:]
master_lst = ["read"]
for line in lines:
 line= line.strip().split(',')
 fourth_field = line [3]
 master_lst.append(fourth_field)

此主列表具有唯一的序列集.现在,我必须循环播放30个折叠的FASTA文件,以计算主列表中这些序列中每个序列的出现次数.这30个文件的文件格式如下:

This master list has the unique set of sequences. Now I have to loop 30 collapsed FASTA files to count the number of occurrences of each of these sequences in the master list. The file format of the 30 files is as follow:

>AAAAAAAAAAAAAAA
7451
>AAAAAAAAAAAAAAAA
4133
>AAAAAAAAAAAAAAAAA
2783

为了计算出现次数,我循环浏览了30个文件中的每个文件,并创建了一个字典,将序列作为键,并将出现次数作为值.然后,我迭代master_lst的每个元素,并将其与上一步中创建的字典中的键进行匹配.如果存在匹配项,则将键的值附加到新列表(ind_lst).如果不是,我将0附加到ind_lst.的代码如下:

For counting the number of occurrences, I looped through each of the 30 file and created a dictionary with sequences as key and number of occurrences as values. Then I iterated each element of the master_lst and matched it with the key in the dictionary created from the previous step. If there is a match, I appended the value of the key to a new list (ind_lst). If not I appended 0 to the ind_lst. The code for that is as follow:

for file in files:
 ind_lst = []
 if file.endswith('.fa'):
  first = file.split(".")
  first_field = first [0]
  ind_lst.append(first_field)
  fasta= open(file)
  individual_dict= {}
  for line in fasta:
   line= line.strip()
   if line == '':
    continue
   if line.startswith('>'):
    header = line.lstrip('>')
    individual_dict[header]= ''
   else:
    individual_dict[header] += line
  for key in master_lst[1:]:
   a = 0
   if key in individual_dict.keys():
     a = individual_dict[key]
   else:
     a = 0
   ind_lst.append(a)

然后使用下面说明的代码将master_lst写入CSV文件和ind_lst:

then I write the master_lst to a CSV file and ind_lst using the code explained here: How to append a new list to an existing CSV file?

最终输出应如下所示:

Read                           file1     file2 so on until file 30
AAAAAAAAAAAAAAA                 7451      4456
AAAAAAAAAAAAAAAA                4133      3624
AAAAAAAAAAAAAAAAA               2783      7012

当我使用较小的master_lst时,此代码可以很好地工作.但是,当master_lst的大小增加时,执行时间将增加太多.我正在使用的master_lst具有35,718,501个序列(元素).当我对50个序列进行子集并运行代码时,脚本需要2个小时来执行.因此,对于35,718,501个序列,将需要永远完成.

This codes work perfectly fine when I use a smaller master_lst. But when the size of the master_lst increases then the execution time increases too much. The master_lst I am working with right now has 35,718,501 sequences(elements). When I subset 50 sequences and run the code, the script takes 2 hours to execute. So for 35,718,501 sequences it will take forever to complete.

现在我不知道如何加快脚本速度.我不太确定是否可以对该脚本进行一些改进以使其在较短的时间内执行.我在具有16个CPU内核的Linux服务器上运行脚本.当我使用命令top时,我可以看到该脚本仅使用一个CPU.但是我不是python专家,我也不知道如何使用多处理模块使它在所有可用的CPU内核上运行.我检查了以下网页: 学习Python的多处理模块 .

Now I don't know how to speed up the script. I am not quite sure if there could be some improvements that can be made to this script to make it execute in a shorter time. I am running my script on a Linux server which has 16 CPU cores. When I use the command top, I could see that the script uses only one CPU. But I am not a expert in python and I don't know how to make it run on all available CPU cores using multiprocessing module. I checked this webpage: Learning Python's Multiprocessing Module.

但是,我不确定defif __name__ == '__main__':应该包含什么.我也不太确定我应该将哪些参数传递给函数.当我尝试从道格拉斯(Douglas)获得第一个代码时,没有传递任何如下参数时,我得到一个错误:

But, I wasn't quite sure what should come under def and if __name__ == '__main__':. I am also not quite sure what arguments should I pass to the function. I was getting an error when I try the first code from Douglas, without passing any arguments as follow:

  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run

self._target(*self._args, **self._kwargs)

过去的几天我一直在努力,但未能成功生成所需的输出.如果有人可以建议一个可以快速运行的替代代码,或者有人可以建议如何在多个CPU上运行该代码,那将是非常棒的.解决该问题的任何帮助将不胜感激.

I have been working this for the last few days and I haven't been successful in generating my desired output. If anyone can suggest an alternative code that could run fast or if anyone could suggest how to run this code on multiple CPUs, that would be awesome. Any help to resolve this issue would be much appreciated.

推荐答案

这是多处理版本.它使用的代码与您在代码中使用的方法略有不同,从而无需创建ind_lst.

Here's a multiprocessing version. It uses a slightly different approach than you do in your code which does away with the need for creating the ind_lst.

区别的本质在于,它首先产生所需数据的转置,然后将其转置为所需结果.

The essence of the difference is that it first produces a transpose of the desired data, and then transpose that into the desired result.

换句话说,不是直接创建它:

In other words, instead of creating this directly:

Read,file1,file2
AAAAAAAAAAAAAAA,7451,4456
AAAAAAAAAAAAAAAA,4133,3624
AAAAAAAAAAAAAAAAA,2783,7012

它首先产生:

Read,AAAAAAAAAAAAAAA,AAAAAAAAAAAAAAAA,AAAAAAAAAAAAAAAAA 
file1,7451,4133,2783
file2,4456,3624,7012

...然后将其与内置的zip()函数进行转置以获得所需的格式.

...and then transposes that with the built-in zip() function to obtain the desired format.

除了不需要创建ind_lst之外,它还允许每个文件创建一行数据,而不是创建一列(这更容易,并且可以更轻松地完成工作).

Besides not needing to create the ind_lst, it also allows the creation of one row of data per file rather than one column of it (which is easier and can be done more efficiently with less effort).

代码如下:

from __future__ import print_function

import csv
from functools import partial
from glob import glob
from itertools import izip  # Python 2
import operator
import os
from multiprocessing import cpu_count, Pool, Queue
import sys

def get_master_list(filename):
    with open(filename, "rb") as csvfile:
        reader = csv.reader(csvfile)
        next(reader)  # ignore first row
        sequence_getter = operator.itemgetter(3)  # retrieves fourth column of each row
        return map(sequence_getter, reader)

def process_fa_file(master_list, filename):
    fa_dict = {}
    with open(filename) as fa_file:
        for line in fa_file:
            if line and line[0] != '>':
                fa_dict[sequence] = int(line)
            elif line:
                sequence = line[1:-1]

    get = fa_dict.get  # local var to expedite access
    basename = os.path.basename(os.path.splitext(filename)[0])
    return [basename] + [get(key, 0) for key in master_list]

def process_fa_files(master_list, filenames):
    pool = Pool(processes=4)  # "processes" is the number of worker processes to
                              # use. If processes is None then the number returned
                              # by cpu_count() is used.
    # Only one argument can be passed to the target function using Pool.map(),
    # so create a partial to pass first argument, which doesn't vary.
    results = pool.map(partial(process_fa_file, master_list), filenames)
    header_row = ['Read'] + master_list
    return [header_row] + results

if __name__ == '__main__':
    master_list = get_master_list('master_list.csv')

    fa_files_dir = '.'  # current directory
    filenames = glob(os.path.join(fa_files_dir, '*.fa'))

    data = process_fa_files(master_list, filenames)

    rows = zip(*data)  # transpose
    with open('output.csv', 'wb') as outfile:
        writer = csv.writer(outfile)
        writer.writerows(rows)

    # show data written to file
    for row in rows:
        print(','.join(map(str, row)))

这篇关于如何使用多处理模块迭代列表并将其与字典中的键进行匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆