将 CSV 文件分成相等的部分? [英] Splitting a CSV file into equal parts?

查看:28
本文介绍了将 CSV 文件分成相等的部分?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个很大的 CSV 文件,我想将其拆分为与系统中 CPU 内核数相等的数字.然后我想使用多进程让所有内核一起处理文件.但是,我什至无法将文件分成几部分.我已经在谷歌上找遍了,我发现了一些示例代码,似乎可以做我想做的事.这是我目前所拥有的:

I have a large CSV file that I would like to split into a number that is equal to the number of CPU cores in the system. I want to then use multiprocess to have all the cores work on the file together. However, I am having trouble even splitting the file into parts. I've looked all over google and I found some sample code that appears to do what I want. Here is what I have so far:

def split(infilename, num_cpus=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    total_file_size = os.path.getsize(infilename)
    print total_file_size
    files = list()
    with open(infilename, 'rb') as infile:
        for i in xrange(num_cpus):
            files.append(tempfile.TemporaryFile())
            this_file_size = 0
            while this_file_size < 1.0 * total_file_size / num_cpus:
                files[-1].write(infile.read(READ_BUFFER))
                this_file_size += READ_BUFFER
        files[-1].write(infile.readline()) # get the possible remainder
        files[-1].seek(0, 0)
    return files

files = split("sample_simple.csv")
print len(files)

for ifile in files:
    reader = csv.reader(ifile)
    for row in reader:
        print row

这两张照片显示了正确的文件大小,并且文件被分成了 4 个部分(我的系统有 4 个 CPU 内核).

The two prints show the correct file size and that it was split into 4 pieces (my system has 4 CPU cores).

但是,打印每个部分中所有行的代码的最后一部分给出了错误:

However, the last section of the code that prints all the rows in each of the pieces gives the error:

for row in reader:
_csv.Error: line contains NULL byte

我尝试在不运行 split 函数的情况下打印行,它正确打印了所有值.我怀疑 split 函数在生成的 4 个文件片段中添加了一些 NULL 字节,但我不知道为什么.

I tried printing the rows without running the split function and it prints all the values correctly. I suspect the split function has added some NULL bytes to the resulting 4 file pieces but I'm not sure why.

有谁知道这是否是一种正确且快速的文件分割方法?我只想要 csv.reader 可以成功读取的结果片段.

Does anyone know if this a correct and fast method to split the file? I just want resulting pieces that can be read successfully by csv.reader.

推荐答案

正如我在评论中所说,csv 文件需要在行(或行)边界上拆分.您的代码不会这样做,并且可能会在一个 — 中间的某个地方将它们分解.我怀疑这是您的 _csv.Error 的原因.

As I said in a comment, csv files would need to be split on row (or line) boundaries. Your code doesn't do this and potentially breaks them up somewhere in the middle of one — which I suspect is the cause of your _csv.Error.

以下通过将输入文件处理为一系列行来避免这样做.我已经对其进行了测试,它似乎可以独立工作,因为它将示例文件分成大约个大小相同的块,因为整行数不太可能完全适合一个块.

The following avoids doing that by processing the input file as a series of lines. I've tested it and it seems to work standalone in the sense that it divided the sample file up into approximately equally size chunks because it's unlikely that an whole number of rows will fit exactly into a chunk.

更新

这是一个比我最初发布的代码大大更快的版本.改进是因为它现在使用临时文件自己的 tell() 方法来确定文件在写入时不断变化的长度,而不是调用 os.path.getsize()code>,无需在写入每一行后flush()文件并调用os.fsync().

This it is a substantially faster version of the code than I originally posted. The improvement is because it now uses the temp file's own tell() method to determine the constantly changing length of the file as it's being written instead of calling os.path.getsize(), which eliminated the need to flush() the file and call os.fsync() on it after each row is written.

import csv
import multiprocessing
import os
import tempfile

def split(infilename, num_chunks=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))

for i, ifile in enumerate(files, start=1):
    print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
    print 'contents of file {}:'.format(i)
    reader = csv.reader(ifile)
    for row in reader:
        print row
    print ''

这篇关于将 CSV 文件分成相等的部分?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆