从延迟的收集创建较大的dask.dataframe时,发生Killed/MemoryError [英] Killed/MemoryError when creating a large dask.dataframe from delayed collection
问题描述
我正在尝试从一大堆中创建一个 dask.dataframe CSV文件(当前为12个文件,每行8-10百万行,每列50列).它们中的一些可能会合在一起放入我的系统内存中,但绝对不会一次全部容纳,因此请使用dask而不是常规的熊猫.
I am trying to create a dask.dataframe from a bunch of large CSV files (currently 12 files, 8-10 million lines and 50 columns each). A few of them might fit together into my system memory but all of them at once definitely will not, hence the use of dask instead of regular pandas.
由于读取每个csv文件涉及一些额外的工作(从文件路径添加带有数据列),因此我尝试从延迟对象的列表中创建dask.dataframe,类似于
Since reading each csv file involves some extra work (adding columns with data from the file path), I tried creating the dask.dataframe from a list of delayed objects, similar to this example.
这是我的代码:
import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd
def read_file_to_dataframe(file_path):
df = pd.read_csv(file_path)
df['some_extra_column'] = 'some_extra_value'
return df
if __name__ == '__main__':
path = '/path/to/my/files'
delayed_collection = list()
for rootdir, subdirs, files in os.walk(path):
for filename in files:
if filename.endswith('.csv'):
file_path = os.path.join(rootdir, filename)
delayed_reader = delayed(read_file_to_dataframe)(file_path)
delayed_collection.append(delayed_reader)
df = dd.from_delayed(delayed_collection)
print(df.compute())
启动该脚本(Python 3.4,dask 0.12.0)时,它运行了几分钟,而我的系统内存却不断地装满.完全使用完后,所有内容都会开始滞后,并且会运行几分钟,然后由于 killed
或 MemoryError
而崩溃.
When starting this script (Python 3.4, dask 0.12.0), it runs for a couple of minutes while my system memory constantly fills up. When it is fully used, everything starts lagging and it runs for some more minutes, then it crashes with killed
or MemoryError
.
我认为dask.dataframe的全部目的是要能够在跨越磁盘上多个文件的大于内存的数据帧上进行操作,所以我在这里做错了什么?
I thought the whole point of dask.dataframe was to be able to operate on larger-than-memory dataframes that span over multiple files on disk, so what am I doing wrong here?
编辑:据我所知,用 df = dd.read_csv(path +'/*.csv')
读取文件似乎正常.但是,这不允许我用文件路径中的其他数据来更改每个单个数据框.
edit: Reading the files instead with df = dd.read_csv(path + '/*.csv')
seems to work fine as far as I can see. However, this does not allow me to alter each single dataframe with additional data from the file path.
修改#2:按照MRocklin的回答,我尝试使用dask的 read_bytes()方法读取数据以及使用单线程调度程序,以及将两者结合使用.尽管如此,即使在具有8GB内存的笔记本电脑上以单线程模式读取100MB的块时,我的进程迟早也会被杀死.但是,在一堆形状相似的小文件(每个文件大约1MB)上运行下面所述的代码可以正常工作.有什么想法我在这里做错了吗?
edit #2: Following MRocklin's answer, I tried to read my data with dask's read_bytes() method as well as using the single-threaded scheduler as well as doing both in combination. Still, even when reading chunks of 100MB in single-threaded mode on a laptop with 8GB of memory, my process gets killed sooner or later. Running the code stated below on a bunch of small files (around 1MB each) of similar shape works fine though. Any ideas what I am doing wrong here?
import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd
def create_df_from_bytesio(bytesio):
df = pd.read_csv(bytesio)
return df
def create_bytesio_from_bytes(block):
bytesio = BytesIO(block)
return bytesio
path = '/path/to/my/files/*.csv'
sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
for block in datafile:
bytesio = delayed(create_bytesio_from_bytes)(block)
df = delayed(create_df_from_bytesio)(bytesio)
delayed_collection.append(df)
dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))
推荐答案
如果您的每个文件都很大,那么在Dask有机会成为之前,对 read_file_to_dataframe
的一些并发调用可能会淹没内存.聪明.
If each of your files is large then a few concurrent calls to read_file_to_dataframe
might be flooding memory before Dask ever gets a chance to be clever.
Dask尝试通过按顺序运行函数以使其可以快速删除中间结果的方式在低内存中进行操作.但是,如果仅几个函数的结果就可以填满内存,那么Dask可能永远不会有删除它们的机会.例如,如果每个函数产生一个2GB的数据帧,并且一次运行八个线程,那么在Dask的调度策略生效之前,您的函数可能会产生16GB的数据.
Dask tries to operate in low memory by running functions in an order such that it can delete intermediate results quickly. However if the results of just a few functions can fill up memory then Dask may never have a chance to delete things. For example if each of your functions produced a 2GB dataframe and if you had eight threads running at once, then your functions might produce 16GB of data before Dask's scheduling policies can kick in.
read_csv起作用的原因是将大型CSV文件分块为许多〜100MB字节块(请参见 blocksize =
关键字参数).您也可以这样做,尽管这很棘手,因为您需要始终在终端上中断.
The reason why read_csv works is that it chunks up large CSV files into many ~100MB blocks of bytes (see the blocksize=
keyword argument). You could do this too, although it's tricky because you need to always break on an endline.
dask.bytes.read_bytes
函数可以为您提供帮助.它可以将单个路径转换为 delayed
对象的列表,每个对象都与该文件的字节范围相对应,该字节范围在定界符上清晰地开始和停止.然后,您可以将这些字节放入 io.BytesIO
(标准库)中,并在其上调用 pandas.read_csv
.注意,您还必须处理标头等.该函数的文档字符串很广泛,应该提供更多帮助.
The dask.bytes.read_bytes
function can help you here. It can convert a single path into a list of delayed
objects, each corresponding to a byte range of that file that starts and stops cleanly on a delimiter. You would then put these bytes into an io.BytesIO
(standard library) and call pandas.read_csv
on that. Beware that you'll also have to handle headers and such. The docstring to that function is extensive and should provide more help.
在上面的示例中,如果我们没有并行度的8倍乘数,一切都会很好.我怀疑如果您一次只运行一个函数,那么事情可能会流水线化,而不会达到您的内存限制.您可以将dask设置为仅在以下一行使用单个线程
In the example above everything would be fine if we didn't have the 8x multiplier from parallelism. I suspect that if you only ran a single function at once that things would probably pipeline without ever reaching your memory limit. You can set dask to use only a single thread with the following line
dask.set_options(get=dask.async.get_sync)
注意:对于 Dask版本> = 0.15,则需要改用 dask.local.get_sync
.
Note: For Dask versions >= 0.15, you need to use dask.local.get_sync
instead.
如果创建dask.dataframe然后立即进行计算
If you make a dask.dataframe and then compute it immediately
ddf = dd.read_csv(...)
df = ddf.compute()
您正在将所有数据加载到Pandas数据框中,这最终会消耗内存.相反,最好在Dask数据帧上进行操作,并且仅根据较小的结果进行计算.
You're loading in all of the data into a Pandas dataframe, which will eventually blow up memory. Instead it's better to operate on the Dask dataframe and only compute on small results.
# result = df.compute() # large result fills memory
result = df.groupby(...).column.mean().compute() # small result
转换为其他格式
CSV是一种普遍且实用的格式,但也存在一些缺陷.您可能会考虑使用HDF5或Parquet之类的数据格式.
Convert to a different format
CSV is a pervasive and pragmatic format, but also has some flaws. You might consider a data format like HDF5 or Parquet.
这篇关于从延迟的收集创建较大的dask.dataframe时,发生Killed/MemoryError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!