从延迟的收集创建较大的dask.dataframe时,发生Killed/MemoryError [英] Killed/MemoryError when creating a large dask.dataframe from delayed collection

查看:65
本文介绍了从延迟的收集创建较大的dask.dataframe时,发生Killed/MemoryError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从一大堆中创建一个 dask.dataframe CSV文件(当前为12个文件,每行8-10百万行,每列50列).它们中的一些可能会合在一起放入我的系统内存中,但绝对不会一次全部容纳,因此请使用dask而不是常规的熊猫.

I am trying to create a dask.dataframe from a bunch of large CSV files (currently 12 files, 8-10 million lines and 50 columns each). A few of them might fit together into my system memory but all of them at once definitely will not, hence the use of dask instead of regular pandas.

由于读取每个csv文件涉及一些额外的工作(从文件路径添加带有数据列),因此我尝试从延迟对象的列表中创建dask.dataframe,类似于

Since reading each csv file involves some extra work (adding columns with data from the file path), I tried creating the dask.dataframe from a list of delayed objects, similar to this example.

这是我的代码:

import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd

def read_file_to_dataframe(file_path):
    df = pd.read_csv(file_path)
    df['some_extra_column'] = 'some_extra_value'
    return df

if __name__ == '__main__':
    path = '/path/to/my/files'
    delayed_collection = list()
    for rootdir, subdirs, files in os.walk(path):
        for filename in files:
            if filename.endswith('.csv'):
                file_path = os.path.join(rootdir, filename)
                delayed_reader = delayed(read_file_to_dataframe)(file_path)
                delayed_collection.append(delayed_reader)

    df = dd.from_delayed(delayed_collection)
    print(df.compute())

启动该脚本(Python 3.4,dask 0.12.0)时,它运行了几分钟,而我的系统内存却不断地装满.完全使用完后,所有内容都会开始滞后,并且会运行几分钟,然后由于 killed MemoryError 而崩溃.

When starting this script (Python 3.4, dask 0.12.0), it runs for a couple of minutes while my system memory constantly fills up. When it is fully used, everything starts lagging and it runs for some more minutes, then it crashes with killed or MemoryError.

我认为dask.dataframe的全部目的是要能够在跨越磁盘上多个文件的大于内存的数据帧上进行操作,所以我在这里做错了什么?

I thought the whole point of dask.dataframe was to be able to operate on larger-than-memory dataframes that span over multiple files on disk, so what am I doing wrong here?

编辑:据我所知,用 df = dd.read_csv(path +'/*.csv')读取文件似乎正常.但是,这不允许我用文件路径中的其他数据来更改每个单个数据框.

edit: Reading the files instead with df = dd.read_csv(path + '/*.csv') seems to work fine as far as I can see. However, this does not allow me to alter each single dataframe with additional data from the file path.

修改#2:按照MRocklin的回答,我尝试使用dask的 read_bytes()方法读取数据以及使用单线程调度程序,以及将两者结合使用.尽管如此,即使在具有8GB内存的笔记本电脑上以单线程模式读取100MB的块时,我的进程迟早也会被杀死.但是,在一堆形状相似的小文件(每个文件大约1MB)上运行下面所述的代码可以正常工作.有什么想法我在这里做错了吗?

edit #2: Following MRocklin's answer, I tried to read my data with dask's read_bytes() method as well as using the single-threaded scheduler as well as doing both in combination. Still, even when reading chunks of 100MB in single-threaded mode on a laptop with 8GB of memory, my process gets killed sooner or later. Running the code stated below on a bunch of small files (around 1MB each) of similar shape works fine though. Any ideas what I am doing wrong here?

import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd

def create_df_from_bytesio(bytesio):
    df = pd.read_csv(bytesio)
    return df

def create_bytesio_from_bytes(block):
    bytesio = BytesIO(block)
    return bytesio


path = '/path/to/my/files/*.csv'

sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
    for block in datafile:
        bytesio = delayed(create_bytesio_from_bytes)(block)
        df = delayed(create_df_from_bytesio)(bytesio)
        delayed_collection.append(df)

dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))

推荐答案

如果您的每个文件都很大,那么在Dask有机会成为之前,对 read_file_to_dataframe 的一些并发调用可能会淹没内存.聪明.

If each of your files is large then a few concurrent calls to read_file_to_dataframe might be flooding memory before Dask ever gets a chance to be clever.

Dask尝试通过按顺序运行函数以使其可以快速删除中间结果的方式在低内存中进行操作.但是,如果仅几个函数的结果就可以填满内存,那么Dask可能永远不会有删除它们的机会.例如,如果每个函数产生一个2GB的数据帧,并且一次运行八个线程,那么在Dask的调度策略生效之前,您的函数可能会产生16GB的数据.

Dask tries to operate in low memory by running functions in an order such that it can delete intermediate results quickly. However if the results of just a few functions can fill up memory then Dask may never have a chance to delete things. For example if each of your functions produced a 2GB dataframe and if you had eight threads running at once, then your functions might produce 16GB of data before Dask's scheduling policies can kick in.

read_csv起作用的原因是将大型CSV文件分块为许多〜100MB字节块(请参见 blocksize = 关键字参数).您也可以这样做,尽管这很棘手,因为您需要始终在终端上中断.

The reason why read_csv works is that it chunks up large CSV files into many ~100MB blocks of bytes (see the blocksize= keyword argument). You could do this too, although it's tricky because you need to always break on an endline.

dask.bytes.read_bytes 函数可以为您提供帮助.它可以将单个路径转换为 delayed 对象的列表,每个对象都与该文件的字节范围相对应,该字节范围在定界符上清晰地开始和停止.然后,您可以将这些字节放入 io.BytesIO (标准库)中,并在其上调用 pandas.read_csv .注意,您还必须处理标头等.该函数的文档字符串很广泛,应该提供更多帮助.

The dask.bytes.read_bytes function can help you here. It can convert a single path into a list of delayed objects, each corresponding to a byte range of that file that starts and stops cleanly on a delimiter. You would then put these bytes into an io.BytesIO (standard library) and call pandas.read_csv on that. Beware that you'll also have to handle headers and such. The docstring to that function is extensive and should provide more help.

在上面的示例中,如果我们没有并行度的8倍乘数,一切都会很好.我怀疑如果您一次只运行一个函数,那么事情可能会流水线化,而不会达到您的内存限制.您可以将dask设置为仅在以下一行使用单个线程

In the example above everything would be fine if we didn't have the 8x multiplier from parallelism. I suspect that if you only ran a single function at once that things would probably pipeline without ever reaching your memory limit. You can set dask to use only a single thread with the following line

dask.set_options(get=dask.async.get_sync)

注意:对于 Dask版本> = 0.15,则需要改用 dask.local.get_sync .

Note: For Dask versions >= 0.15, you need to use dask.local.get_sync instead.

如果创建dask.dataframe然后立即进行计算

If you make a dask.dataframe and then compute it immediately

ddf = dd.read_csv(...)
df = ddf.compute()

您正在将所有数据加载到Pandas数据框中,这最终会消耗内存.相反,最好在Dask数据帧上进行操作,并且仅根据较小的结果进行计算.

You're loading in all of the data into a Pandas dataframe, which will eventually blow up memory. Instead it's better to operate on the Dask dataframe and only compute on small results.

# result = df.compute()  # large result fills memory
result = df.groupby(...).column.mean().compute()  # small result

转换为其他格式

CSV是一种普遍且实用的格式,但也存在一些缺陷.您可能会考虑使用HDF5或Parquet之类的数据格式.

Convert to a different format

CSV is a pervasive and pragmatic format, but also has some flaws. You might consider a data format like HDF5 or Parquet.

这篇关于从延迟的收集创建较大的dask.dataframe时,发生Killed/MemoryError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆