Dask:过滤期间内存不足 (MRE) [英] Dask: Running out of memory during filtering (MRE)

查看:128
本文介绍了Dask:过滤期间内存不足 (MRE)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

tl;博士

我想根据列的值过滤 Dask 数据框,即

data.loc[data[column].lt(value)].to_parquet(path)

但我这样做时内存不足,尽管每个分区都比可用内存小 20 倍.

示例数据

让我们首先创建一些示例数据来处理

将 numpy 导入为 np将熊猫导入为 pd将 dask.dataframe 导入为 dddf = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])ddf = dd.from_pandas(df, npartitions=800) # 每个分区大约有 10Mb 的内存ddf.to_parquet('sample.parq')

解决方案尝试

假设我的机器只有 512Mb 的内存.当然,我的问题规模要大得多(在我的例子中是 TB),但这个简单的问题似乎解决了我在处理更大数据集时遇到的同样问题.

因此我将使用两个工人,每个工人 200Mb

from dask.distributed import Client, LocalClustercluster = LocalCluster(n_workers=2,memory_limit='200Mb')客户端 = 客户端(集群)数据 = dd.read_parquet('sample.parq')task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)

由于每个分区占用 10Mb 内存,根据 Wes Kinney 的 经验法则,我可能最多需要 50-100Mb 的内存来处理一个分区,所以 200Mb 应该绰绰有余.

但是,当我运行 task.compute() 时,worker 几乎立即耗尽内存,重新启动然后完全杀死.

我尝试过的东西

限制资源

我还尝试限制员工资源.据我了解,这应该让工作人员知道它一次只能处理一项任务.也许这太保守了,但在那种情况下,我希望会发生死锁,而不是内存不足.

cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})task.compute(resources={'m': 200})

然而,遗憾的是结果是一样的

分析读取 parquet 的内存使用情况

SultanOrazbayev 建议我应该使用 memory_profiler 来查看加载单个分区期间使用了多少内存,因为使用 read_parquet 可能是这里的罪魁祸首.

我写了test-load.py

将pandas导入为pd@轮廓def load_parq():return pd.read_parquet('sample.parq/part.0.parquet')如果 __name__ == '__main__':df = load_parq()print(f'内存占用:{df.memory_usage(deep=True).sum()//1e6}MB')

并使用 python3 -m memory_profiler test-load.py 运行它.这是输出:

内存占用:10.0MB文件名:test-load.py行 # 内存使用增量出现行内容============================================================3 74.750 MiB 74.750 MiB 1 @profile4 def load_parq():5 132.992 MiB 58.242 MiB 1 返回 pd.read_parquet('sample.parq/part.0.parquet')

而且——很公平——即使读取单个文件也需要比我想象的更多的内存.200MB 可能还不够,但多少呢?

在我的设置中,结果证明对于两个工人中的每一个来说大约 4G​​B.这实际上等于整个数据集.事实上,看着仪表板,dask 似乎很高兴同时加载了几十个分区.如果它有 4G​​B 的内存,那很好,但如果它没有那么多,我该如何继续?

解决方案

我想我已经找到了这个案例的罪魁祸首.问题是 dask 会自动尝试使用尽可能多的可用内核.

from dask.distributed import Client以 Client(n_workers=2, memory_limit='300Mb') 作为客户端:打印(客户端)

生产

当我因此尝试读取镶木地板文件时,dask 使用了所有 48 个可用内核,内存立即耗尽.

这里的技巧是限制每个工作线程的数量:

以 Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) 作为客户端:打印(客户端)

产生的结果

然后计算没有任何问题,在任何时间点每个工人使用大约 200-250MB.

相关问题

tl;dr

I want to filter a Dask dataframe based on a value of a column, i.e.

data.loc[data[column].lt(value)].to_parquet(path)

but I run out of memory doing so, despite each partition being 20-times smaller than the available memory.

Sample data

Let's first create some sample data to work with

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800)  # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')

Solution attempt

Let's assume that my machine only has 512Mb of memory. Of course, my problem is of a much bigger scale (terabytes in my case), but it seems that this simple problem captures the same problem I have with a larger dataset.

I will thus use two workers with 200Mb each

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)

data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)

Since each partition takes 10Mb in memory, according to Wes Kinney's rule of thumb, I might need at most 50-100Mb of memory to process a partition, so 200Mb should be more than enough.

However, when I run task.compute() workers almost immediately run out of memory, get restart and later killed altogether.

Things I tried

Limiting resources

I have also tried to limit worker resources. As far as I understand, this should let the worker know that it can only process one task at a time. Perhaps this is too conservative, but in that case, I'd expect a deadlock to happen, not to run out of memory.

cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})

However, the result is sadly the same

Profiling memory usage of reading parquet

SultanOrazbayev suggested that I should use memory_profiler to see how much memory is used during loading a single partition, since the usage of read_parquet is likely the culprit here.

I wrote test-load.py

import pandas as pd

@profile
def load_parq():
    return pd.read_parquet('sample.parq/part.0.parquet')

if __name__ == '__main__':
    df = load_parq()
    print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')

And ran it using python3 -m memory_profiler test-load.py. This is the output:

Memory footprint: 10.0MB
Filename: test-load.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     3   74.750 MiB   74.750 MiB           1   @profile
     4                                         def load_parq():
     5  132.992 MiB   58.242 MiB           1       return pd.read_parquet('sample.parq/part.0.parquet')

And — fair enough — even reading a single file requires even more memory that I thought. 200MB perhaps might not be enough, but how much is?

The answer, on my setup, turns out to be about 4GB for each of the two workers. That's actually equal to the dataset as a whole. And in fact, looking at the dashboard, dask seems to happily be loading dozens of partitions at once. That's fine if it has the 4GB of memory, but how can I proceed if it doesn't have that much?

解决方案

I think I have figured the culprit in this case. The problem is that dask automatically attempted to use as many cores as were available.

from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
    print(client)

produces

<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>

When I thus attempt to read a parquet file, dask uses all 48 available cores, instantly running out of memory.

The trick here is to limit the number of threads per worker:

with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
    print(client)

which yields

<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>

and the computation then proceeds without any problems, using about 200-250MB per worker at any point in time.

Related question

这篇关于Dask:过滤期间内存不足 (MRE)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆