Dask:过滤期间内存不足 (MRE) [英] Dask: Running out of memory during filtering (MRE)
问题描述
tl;博士
我想根据列的值过滤 Dask 数据框,即
data.loc[data[column].lt(value)].to_parquet(path)
但我这样做时内存不足,尽管每个分区都比可用内存小 20 倍.
示例数据
让我们首先创建一些示例数据来处理
将 numpy 导入为 np将熊猫导入为 pd将 dask.dataframe 导入为 dddf = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])ddf = dd.from_pandas(df, npartitions=800) # 每个分区大约有 10Mb 的内存ddf.to_parquet('sample.parq')
解决方案尝试
假设我的机器只有 512Mb 的内存.当然,我的问题规模要大得多(在我的例子中是 TB),但这个简单的问题似乎解决了我在处理更大数据集时遇到的同样问题.
因此我将使用两个工人,每个工人 200Mb
from dask.distributed import Client, LocalClustercluster = LocalCluster(n_workers=2,memory_limit='200Mb')客户端 = 客户端(集群)数据 = dd.read_parquet('sample.parq')task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)
由于每个分区占用 10Mb 内存,根据 Wes Kinney 的 经验法则,我可能最多需要 50-100Mb 的内存来处理一个分区,所以 200Mb 应该绰绰有余.
但是,当我运行 task.compute()
时,worker 几乎立即耗尽内存,重新启动然后完全杀死.
我尝试过的东西
限制资源
我还尝试限制员工资源.据我了解,这应该让工作人员知道它一次只能处理一项任务.也许这太保守了,但在那种情况下,我希望会发生死锁,而不是内存不足.
cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})task.compute(resources={'m': 200})
然而,遗憾的是结果是一样的
分析读取 parquet 的内存使用情况
SultanOrazbayev 建议我应该使用 memory_profiler
来查看加载单个分区期间使用了多少内存,因为使用 read_parquet
可能是这里的罪魁祸首.>
我写了test-load.py
将pandas导入为pd@轮廓def load_parq():return pd.read_parquet('sample.parq/part.0.parquet')如果 __name__ == '__main__':df = load_parq()print(f'内存占用:{df.memory_usage(deep=True).sum()//1e6}MB')
并使用 python3 -m memory_profiler test-load.py
运行它.这是输出:
内存占用:10.0MB文件名:test-load.py行 # 内存使用增量出现行内容============================================================3 74.750 MiB 74.750 MiB 1 @profile4 def load_parq():5 132.992 MiB 58.242 MiB 1 返回 pd.read_parquet('sample.parq/part.0.parquet')
而且——很公平——即使读取单个文件也需要比我想象的更多的内存.200MB
可能还不够,但多少呢?
在我的设置中,结果证明对于两个工人中的每一个来说大约 4GB
.这实际上等于整个数据集.事实上,看着仪表板,dask 似乎很高兴同时加载了几十个分区.如果它有 4GB
的内存,那很好,但如果它没有那么多,我该如何继续?
我想我已经找到了这个案例的罪魁祸首.问题是 dask 会自动尝试使用尽可能多的可用内核.
from dask.distributed import Client以 Client(n_workers=2, memory_limit='300Mb') 作为客户端:打印(客户端)
生产
当我因此尝试读取镶木地板文件时,dask 使用了所有 48 个可用内核,内存立即耗尽.
这里的技巧是限制每个工作线程的数量:
以 Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) 作为客户端:打印(客户端)
产生的结果
然后计算没有任何问题,在任何时间点每个工人使用大约 200-250MB.
相关问题
tl;dr
I want to filter a Dask dataframe based on a value of a column, i.e.
data.loc[data[column].lt(value)].to_parquet(path)
but I run out of memory doing so, despite each partition being 20-times smaller than the available memory.
Sample data
Let's first create some sample data to work with
import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800) # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')
Solution attempt
Let's assume that my machine only has 512Mb of memory. Of course, my problem is of a much bigger scale (terabytes in my case), but it seems that this simple problem captures the same problem I have with a larger dataset.
I will thus use two workers with 200Mb
each
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)
data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)
Since each partition takes 10Mb in memory, according to Wes Kinney's rule of thumb, I might need at most 50-100Mb of memory to process a partition, so 200Mb should be more than enough.
However, when I run task.compute()
workers almost immediately run out of memory, get restart and later killed altogether.
Things I tried
Limiting resources
I have also tried to limit worker resources. As far as I understand, this should let the worker know that it can only process one task at a time. Perhaps this is too conservative, but in that case, I'd expect a deadlock to happen, not to run out of memory.
cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})
However, the result is sadly the same
Profiling memory usage of reading parquet
SultanOrazbayev suggested that I should use memory_profiler
to see how much memory is used during loading a single partition, since the usage of read_parquet
is likely the culprit here.
I wrote test-load.py
import pandas as pd
@profile
def load_parq():
return pd.read_parquet('sample.parq/part.0.parquet')
if __name__ == '__main__':
df = load_parq()
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
And ran it using python3 -m memory_profiler test-load.py
. This is the output:
Memory footprint: 10.0MB
Filename: test-load.py
Line # Mem usage Increment Occurences Line Contents
============================================================
3 74.750 MiB 74.750 MiB 1 @profile
4 def load_parq():
5 132.992 MiB 58.242 MiB 1 return pd.read_parquet('sample.parq/part.0.parquet')
And — fair enough — even reading a single file requires even more memory that I thought. 200MB
perhaps might not be enough, but how much is?
The answer, on my setup, turns out to be about 4GB
for each of the two workers. That's actually equal to the dataset as a whole. And in fact, looking at the dashboard, dask seems to happily be loading dozens of partitions at once. That's fine if it has the 4GB
of memory, but how can I proceed if it doesn't have that much?
I think I have figured the culprit in this case. The problem is that dask automatically attempted to use as many cores as were available.
from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
print(client)
produces
<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>
When I thus attempt to read a parquet file, dask uses all 48 available cores, instantly running out of memory.
The trick here is to limit the number of threads per worker:
with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
print(client)
which yields
<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>
and the computation then proceeds without any problems, using about 200-250MB per worker at any point in time.
Related question
这篇关于Dask:过滤期间内存不足 (MRE)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!