在Python中使用多线程的For Loop对象列表 [英] For Loop for list of Objects with use Multithread in Python

查看:105
本文介绍了在Python中使用多线程的For Loop对象列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Python的新手,我有一个程序可以加载一个大CSV文件,该文件超过10万行,每行有4列. 在FOR循环中,我为每一行检查相同的重复列表( dlist ),此 dlist DRef 类的对象列表,我将其与其他对象一起加载功能

I am new in Python, I had a program which loads one big CSV file where is over 100k lines, each line had 4 columns. In FOR loop I check for each row same duplicated list (dlist), this dlist is list of objects of DRef class which I load with another function

DsRef类:

from tqdm import tqdm
from multiprocessing import Pool, cpu_count, freeze_support

class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)  

    def __iter__(self):
        return iter(self.__dict__.items())

复制类别:

class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                 'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp 

加载数据文件样本进行测试:

dlist= []
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
                    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))

用于查找并返回重复值的行的方法:

def FindDuplications(dlist):
    duplicates = []
    for pn, comp in enumerate(dlist):            
        matches = [xpn for xpn, xcomp in enumerate(dlist) if pn == xpn and comp == xcomp]
        duplicates.append(Duplication(pn, comp, len(matches)))
    return duplicates

row.pn == x.pn and row.comp == x.comp如果为真,我发现重复了,我将每个对象的前两个参数与列表中的每个对象进行比较

row.pn == x.pn and row.comp == x.comp if its true I find a duplication I compare first 2 parameters of each objech with each object in list

现在,我尝试使用类似的方法将所有处理器用于更快的结果,但是现在需要15分钟以上的时间

Now I try to use something like that for use all processor for a faster result, now it takes over 15 minutes

if __name__ == '__main__':
    freeze_support()
    p = Pool(cpu_count())
    duplicates = p.map(FindDuplications, dlist)
    p.close()
    p.join()

首先,当Class不可迭代时出现错误,然后为第一类创建 iter 函数,此后,我得到了一个错误,则元组对象不知道 pn comp 参数,那么我使用in进行枚举(dlist),但仍然无法正常工作

In first I got an error when Class is not iterable then I create iter functions for first class, after that, I got an error then tuple object does not know pn or comp parameter, then I use in for enumerate(dlist) but still does not work

能请你帮我吗?

我还想使用TQDM检查处理功能的进度以查找重复项

I would like also use TQDM to check the progress of processing function to find duplications

有一个原始的工作功能,未使用多线程处理:

there is an original working function without use Multithreading:

def CheckDuplications(dlist):
    print(get_yellow("========= CHECK CROSS DUPLICATIONS ========="))
    duplicates = []
    for r in tqdm(dlist):
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    results = [d for d in duplicates if d.cnt > 1]
    results = set(results)
    return results

从函数 FindDuplications 中,我获得了DsRef对象的列表(简单副本),但这必须返回Duplicate对象的列表,这是错误的

From function FindDuplications I got list of DsRef objects (simple copy), but this must return list of Duplication objects, something is wrong

谢谢

推荐答案

代码中有一些麻烦,您没有并行处理,不能仅仅在多个内核上运行繁重任务的单线程代码.该代码需要一些采用.

There were a few troubles in the code, you didn't parallel it, you can't just run one-thread code with a heavy task on multiple cores. The code requires some adopts.

好吧,无论如何,我们在这里:)

Ok, anyway, here we are :)

from math import ceil
from multiprocessing import Pool, cpu_count, freeze_support


def get_red(val):
    return val


class DsRef:
    def __init__(self, pn, comp, comp_name, type, diff):
        self.pn = pn
        self.comp = comp
        self.comp_name = comp_name
        self.type = type
        self.diff = diff

    def __str__(self):
        return f'{self.pn} {get_red("|")} {self.comp} {get_red("|")} {self.comp_name} {get_red("|")} {self.type} {get_red("|")} {self.diff}\n'

    def __repr__(self):
        return str(self)


class Duplication:
    def __init__(self, pn, comp, cnt):
        self.pn = pn
        self.comp = comp
        self.cnt = cnt

    def __str__(self):
        return f'{self.pn};{self.comp};{self.cnt}\n'

    def __repr__(self):
        return str(self)

    def __hash__(self):
        return hash(('pn', self.pn,
                     'comp', self.comp))

    def __eq__(self, other):
        return self.pn == other.pn and self.comp == other.comp


dlist = []
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XCX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VCV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XYX", "CCC_YYY", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TAT_XQX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "ATT_XXX", "CCC_VQV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XWX", "CCC_VVV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_XXX", "CCC_VWV", "CTYPE", "CTYPE", "text"))
dlist.append(DsRef(
    "TTT_EEE", "CCC_VVV", "CTYPE", "CTYPE", "text"))


def FindDuplications(task):
    dlist, start, count = task

    duplicates = []
    for r in dlist[start:start + count]:
        matches = [x for x in dlist if r.pn == x.pn and r.comp == x.comp]
        duplicates.append(Duplication(r.pn, r.comp, len(matches)))

    return {d for d in duplicates if d.cnt > 1}


if __name__ == '__main__':
    freeze_support()

    threads = cpu_count()
    tasks_per_thread = ceil(len(dlist) / threads)

    tasks = [(dlist, tasks_per_thread * i, tasks_per_thread) for i in range(threads)]

    p = Pool(threads)
    duplicates = p.map(FindDuplications, tasks)
    p.close()
    p.join()

    duplicates = {item for sublist in duplicates for item in sublist}

    print(duplicates)
    print(type(duplicates))

它对我来说效果很好,并返回与单线程函数相同的结果,并且可以在所有可用内核中并行工作.

It works well for me and returns the same results as one-thread function and works in all available cores in parallel.

输出

python test.py
{TTT_EEE;CCC_VVV;2
, TTT_XXX;CCC_VVV;2
}
<class 'set'>

这篇关于在Python中使用多线程的For Loop对象列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆