在python中读取1500万行csv文件的有效方法 [英] Efficient way to read 15 M lines csv files in python

查看:153
本文介绍了在python中读取1500万行csv文件的有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于我的应用程序,我需要读取每行15百万行的多个文件,将它们存储在DataFrame中,然后以HDFS5格式保存该DataFrame.

For my application, I need to read multiple files with 15 M lines each, store them in a DataFrame, and save the DataFrame in HDFS5 format.

我已经尝试了不同的方法,特别是具有chunksize和dtype规范的pandas.read_csv以及dask.dataframe.它们都需要大约90秒才能处理1个文件,因此,我想知道是否有一种方法可以按所述方式有效地处理这些文件.在下面的代码中,我显示了一些我已经完成的测试的代码.

I've already tried different approaches, notably pandas.read_csv with chunksize and dtype specifications, and dask.dataframe. They both take around 90 seconds to treat 1 file, and so I'd like to know if there's a way to efficiently treat these files in the described way. In the following, I show some code of the tests I've done.

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
        skiprows=15
     )
store.put(dfname, df.compute())
store.close()

这是文件的外观(空白由文字标签组成):

Here is what the files look like (whitespace consists of a literal tab):

a   b
599.998413  14.142895
599.998413  20.105534
599.998413  6.553850
599.998474  27.116098
599.998474  13.060312
599.998474  13.766775
599.998596  1.826706
599.998596  18.275938
599.998718  20.797491
599.998718  6.132450)
599.998718  41.646194
599.998779  19.145775

推荐答案

首先,让我们回答问题的标题

First, lets answer the title of the question

我建议您使用 modin :

import modin.pandas as mpd
import pandas as pd
import numpy as np

frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2)) 
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)

!wc 15mil*.csv ; du -h 15mil*.csv

    15000000   15000000  480696661 15mil.csv
    459M    15mil.csv

现在已达到基准:

%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
    9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
    3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)

(df2.values == df1.values).all()
    True

因此我们可以看到,modin在我的设置上快了大约 3倍.

So as we can see modin was approximatly 3 times faster on my setup.

现在要回答您的特定问题

Now to answer your specific problem

正如人们所指出的那样,您的瓶颈可能是转换器.您称这些lambda为3千万次.在这种规模下,甚至函数调用的开销也变得微不足道.

As people have noted, your bottleneck is probably the converter. You are calling those lambdas 30 Million times. Even the function call overhead becomes non-trivial at that scale.

让我们来解决这个问题.

Let's attack this problem.

!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv

方法

首先,我尝试将modin与converters参数一起使用.然后,我尝试了另一种调用regexp的方法:

Approaches

First, I tried using modin with the converters argument. Then, I tried a different approach that calls the regexp less times:

首先,我将创建一个类似于File的对象,该对象会通过您的正则表达式过滤所有内容:

First I will create a File-like object that filters everything through your regexp:

class FilterFile():
    def __init__(self, file):
        self.file = file
    def read(self, n):
        return re.sub(r"[^\d.,\n]", "", self.file.read(n))
    def write(self, *a): return self.file.write(*a) # needed to trick pandas
    def __iter__(self, *a): return self.file.__iter__(*a) # needed

然后我们将其作为read_csv中的第一个参数传递给熊猫:

Then we pass it to pandas as the first argument in read_csv:

with open('15mil_dirty.csv') as file:
    df2 = pd.read_csv(FilterFile(file))

基准:

%%timeit -r 1 -n 1 -t
global df1
df1 = pd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    2min 28s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1 -t
global df2
df2 = mpd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    38.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1 -t
global df3
df3 = pd.read_csv(FilterFile(open('15mil_dirty.csv')), header=None,)
    1min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

好像莫丁又赢了!不幸的是,modin尚未实现从缓冲区读取的功能,因此我设计了最终的方法.

Seems like modin wins again! Unfortunatly modin has not implemented reading from buffers yet so I devised the ultimate approach.

%%timeit -r 1 -n 1 -t
with open('15mil_dirty.csv') as f, open('/dev/shm/tmp_file', 'w') as tmp:
    tmp.write(f.read().translate({ord(i):None for i in '()'}))
df4 = mpd.read_csv('/dev/shm/tmp_file', header=None)
    5.68 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

这使用了 translate ,它比 re.sub 快得多,还使用了/dev/shm ,它是内存中的文件系统,通常会提供ubuntu(和其他Linux).写入那里的任何文件都永远不会进入磁盘,因此速度很快.最后,它使用modin读取文件,从而解决了modin的缓冲区限制.这种方法比您的方法快 30倍,而且非常简单.

This uses translate which is considerably faster than re.sub, and also uses /dev/shm which is in-memory filesystem that ubuntu (and other linuxes) usually provide. Any file written there will never go to disk, so it is fast. Finally, it uses modin to read the file, working around modin's buffer limitation. This approach is about 30 times faster than your approach, and it is pretty simple, also.

这篇关于在python中读取1500万行csv文件的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆