python中大型数据集的多重处理(查找重复项) [英] Multiprocessing on a large dataset in python (Finding Duplicates)

查看:48
本文介绍了python中大型数据集的多重处理(查找重复项)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 json 文件,我想从中删除重复的行,但它太大而无法放入内存.我找到了一种方法来完成它,但我猜这不是最好的方法.

I've a json file that I want to remove duplicate rows from, but it's too large to fit into memory. I found a way to get it done, but my guess is that it's not the best way.

我的问题是对于 12GB 的数据集,它在 8 分钟内运行.但要求是扩展代码,以便它可以在 100gb 数据集上运行.有关如何执行此操作的任何指示?我应该在 python 中使用多线程还是多处理来实现这一点?或者其他方法?

My problem is that it runs in 8 minutes for a 12gb dataset. But the requirement is to scale the code so that it could run on 100gb dataset. Any pointers on how to do this? Should I use multi-threading or multi-processing in python to achieve this ? Or any other method?

这是代码:

import json
import time

""" This class contains the business logic for identifying the duplicates and creating an output file for further processing """

class BusinessService:

    """ The method identiifes the duplicate """
    def service(ipPath,opPath):
            start_time = time.time()    #We start the timer to see how much time the method takes to work #
            uniqueHandleSet = set();     #Creating a set to store unique values #
            try:
                duplicateHandles = open(opPath,'w+',encoding='utf-8')     #Opening and creating an output file to catch the duplicate hanndles #                     
                with open(ipPath,buffering = 200000000,encoding = 'utf-8') as infile:     #Reading the JSON File by buffering and using 20mb as it is too big to read at once #       
                    for line in infile:
                        tweetJsonObject = json.loads(line);

                        if tweetJsonObject["name"] not in uniqueHandleSet:
                            uniqueHandleSet.add(tweetJsonObject["name"]);
                        else:
                            duplicateHandles.write(line);

                print("--- %s seconds --- memory 200mb while buffering" % (time.time() - start_time));  #Printing the total time required to execute 
            except:
                print("Error")
            finally:
                duplicateHandles.close();

推荐答案

要扩展它,您需要用于提供多个进程的队列和两个共享列表来跟踪您的结果.主要思想是将文件逐行提供给队列,该队列随后由一些消费者进程处理.然而,这些进程共享两个列表来存储中间结果.Manager 负责进程之间的同步.

To scale it, you would need queues for feeding multiples processses and two shared lists to keep track of your results. The main idea is feeding the file line by line to a queue that is subsequently processed by some consumer processes. These processes however share two lists to store the intermediate results. The Manager is responsible for the synchronization between the processes.

以下代码只是一些粗略的指导,未经真正测试:

The following code is just some rough guideline, not really tested:

from multiprocessing import Process, Manager, Queue

def findDuplicate(inputQueue, uniqueValues, duplicates):
    for line in iter(inputQueue.get, 'STOP'): #get line from Queue, stop if 'STOP' is received
        if line not in uniqueValues: # check if duplicate
            uniqueValues.append(line)
        else:
            duplicates.append(line) # store it

manager = Manager() # get a new SyncManager
uniqueValues = manager.list() # handle for shared list
duplicates = manager.list() # a 2nd handle for a shared list
inputQueue = Queue() # a queue to provide tasks to the processes

# setup workers, provide shared lists and tasks
numProc = 4
process = [Process(target=findDuplicate,
                      args=(inputQueue, uniqueValues, duplicates)) for x in range(numProc)]

# start processes, they will idle if nothing is in queue
for p in process:
    p.start()

with open(ipPath) as f:
    for line in f:
        inputQueue.put(line, block=True) # put line in queue, only if free slot avaible
for p in process:
    inputQueue.put('STOP') # signal workers to stop as no further input

    # wait for processes to finish
for p in process:
    p.join()

这篇关于python中大型数据集的多重处理(查找重复项)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆