读取,格式化然后写入大型CSV文件 [英] Read, format, then write large CSV files

查看:87
本文介绍了读取,格式化然后写入大型CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有相当大的csv文件,需要逐行操作/修改(因为每一行可能需要不同的修改规则),然后以正确的格式将它们写到另一个csv中.

I have fairly large csv files that I need to manipulate/amend line-by-line (as each line may require different amending rules) then write them out to another csv with the proper formatting.

当前,我有:

import multiprocessing

def read(buffer):
    pool = multiprocessing.Pool(4)
    with open("/path/to/file.csv", 'r') as f:
        while True:
            lines = pool.map(format_data, f.readlines(buffer))
            if not lines:
                break
            yield lines

def format_data(row):
    row = row.split(',') # Because readlines() returns a string
    # Do formatting via list comprehension
    return row

def main():
    buf = 65535
    rows = read(buf)
    with open("/path/to/new.csv",'w') as out:
        writer = csv.writer(f, lineterminator='\n')
        while rows:
            try:
                writer.writerows(next(rows))
            except StopIteration:
                break

即使我通过map使用多处理并通过生成器防止内存超载,我仍然需要花费2分钟多的时间来处理40,000行.老实说,它不应该花那么多钱.我什至从生成器的输出生成了一个嵌套列表,并试图一次将数据作为一个大文件写入,逐个块的方法,但仍然需要很长时间.我在这里做什么错了?

Even though I'm using multiprocessing via map and preventing memory overload with a generator, it still takes me well over 2 min to process 40,000 lines. It honestly shouldn't take that much. I've even generated a nested list from the generator outputs and trying to write the data as one large file at one time, vice a chunk-by-chunk method and still it takes as long. What am I doing wrong here?

推荐答案

我已经弄清楚了.

首先,问题出在我的format_data()函数中.它正在调用数据库连接,每次运行时,它都会构建数据库连接并在每次迭代时都将其关闭.

First, the issue was in my format_data() function. It was making a call to a database connection that, every time it ran, it constructed the database connection and closed it with each iteration.

我通过通过字典为支持多线程的指数级更快的查找表创建基本映射来修复它.

I fixed it by creating a basic mapping via a dictionary for an exponentially faster lookup table that supports multithreading.

所以,我的代码如下:

import multiprocessing

def read(buffer):
    pool = multiprocessing.Pool(4)
    with open("/path/to/file.csv", 'r') as f:
        while True:
            lines = pool.map(format_data, f.readlines(buffer))
            if not lines:
                break
            yield lines

def format_data(row):
    row = row.split(',') # Because readlines() returns a string
    # Do formatting via list comprehension AND a dictionary lookup
    # vice a database connection
    return row

def main():
    rows = read(1024*1024)
    with open("/path/to/new.csv",'w') as out:
        while rows:
            try:
                csv.writer(f, lineterminator='\n').writerows(next(rows))
            except StopIteration:
                break

我能够在不到30秒的时间内解析出约150MB的文件.这里有一些经验教训,希望其他人可以借鉴.

I was able to parse a ~150MB file in less than 30 sec. Some lessons learned here for others to hopefully learn from.

这篇关于读取,格式化然后写入大型CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆