即使有块,也快耗尽内存 [英] Dask running out of memory even with chunks

查看:60
本文介绍了即使有块,也快耗尽内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理大型CSV文件,并且我需要制作笛卡尔乘积(合并操作)。我试图解决Pandas的问题(您可以检查Panda的代码和数据格式示例来解决同一问题此处)由于内存错误而没有成功。现在,我正在尝试使用Dask,即使它的大小大于可用的RAM,它也应该管理庞大的数据集。

I'm working with big CSV files and I need to make a Cartesian Product (merge operation). I've tried to face the problem with Pandas (you can check Panda's code and a data format example for the same problem, here) without success due to memory errors. Now, I'm trying with Dask, which is supposed to manage huge datasets even when its size is bigger than the available RAM.

首先,我阅读了两个CSV文件:

First of all I read both CSV:

from dask import dataframe as dd

BLOCKSIZE = 64000000  # = 64 Mb chunks


df1_file_path = './mRNA_TCGA_breast.csv'
df2_file_path = './miRNA_TCGA_breast.csv'

# Gets Dataframes
df1 = dd.read_csv(
    df1_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df1.columns.values[0]
df1.set_index(first_column)
df2 = dd.read_csv(
    df2_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df2.columns.values[0]
df2.set_index(first_column)

# Filter common columns
common_columns = df1.columns.intersection(df2.columns)
df1 = df1[common_columns]
df2 = df2[common_columns]

然后,我手术n存储在磁盘上以防止内存错误:

Then, I make the operation storing on disk to prevent memory errors:

# Computes a Cartesian product
df1['_tmpkey'] = 1
df2['_tmpkey'] = 1

# Neither of these two options work
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_hdf('/tmp/merge.*.hdf', key='/merge_data')
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_parquet('/tmp/')

我做了回购尝试使用与我使用的完全相同的CSV文件。我尝试使用较小的 blocksize 值,但出现相同的错误。我想念什么吗?

I've made a repo to try with exactly the same CSV files that I'm using. I've tried with smaller blocksize values but I got the same error. Am I missing something? Any kind of help would be really appreciated.

推荐答案

我成功地使用以下方法运行了您的代码,内存限制为32GB。

I successfully ran your code using the following method with a memory limited to 32GB.

我摆脱了 BLOCKSIZE 的论点,并使用了分区

I've get rid of the argument BLOCKSIZE and used repartition instead on df1 and df2.

df1 = df1.repartition(npartitions=50)
df2 = df2.repartition(npartitions=1)

请注意,df2的大小非常小与df1( 2.5 MB vs 23.75 MB )相比,这就是为什么我只为df2保留一个分区并将df1划分为50个分区的原因。

Notice that the size of df2 is really smaller compared to df1 (2.5 MB vs 23.75 MB), that's why I've kept only one partition for df2 and cut df1 into 50 partitions.

这样做应该使代码对您有用。
对我来说,使用的内存一直低于12GB。

And doing so should make the code work for you. For me, the memory used stayed below 12GB.

要进行检查,我计算了结果的len:

To check, I've computed the len of the result :

len(df) # 3001995

按照上述步骤创建一个包含50个分区的镶木地板文件。
您可以再次使用分区来获得所需的partition_size。

Following what's above creates a parquet file with 50 partitions. You can use repartition again to get to the partition_size you want.

注意:

添加此代码可以加快代码的速度:

Adding this should speed up your code:

from dask.distributed import Client
client = Client()

由于我的运行环境,必须使用参数 Client(processes = False)

In my case, I had to use the argument Client(processes=False) because of my running environment.

这篇关于即使有块,也快耗尽内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆