Dask Dataframe非唯一操作:工人内存不足(MRE) [英] Dask Dataframe nunique operation: Worker running out of memory (MRE)

查看:101
本文介绍了Dask Dataframe非唯一操作:工人内存不足(MRE)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

tl; dr

我想

  dd.read_parquet('*.parq')['column'].nunique().compute() 

但我明白了

 警告-工作者超出了95%的内存预算.重新开始 

在工人被彻底杀害之前的几次.


长版

我有一个数据集

  • 100亿行,
  • 〜20列,

和一台具有约200GB内存的计算机.我正在尝试使用dask的 LocalCluster 处理数据,但是即使我使用相当小的子集并尝试使用基本操作,我的工作人员也会迅速超出其内存预算并被杀死.

我已经重新创建了一个玩具问题,在下面演示了该问题.

综合数据

要在较小范围内近似解决上述问题,我将创建一个具有32个字符的ID的单列,

  • 一百万个唯一ID
  • 2亿行的总长度
  • 分成100个 parquet 文件

结果将是

  • 100个文件,每个 66MB ,作为Pandas数据帧加载时占用 178MB (由 df.memory_usage(deep = True).sum()估计))
  • 如果作为熊猫数据框加载,则所有数据将在内存中占用 20GB
  • 具有所有id(我认为工作人员在计算 nunique 时也必须保留在内存中的一个序列)大约需要 90MB

 导入字符串导入操作系统将numpy导入为np将熊猫作为pd导入字符= string.ascii_letters + string.digitsn_total = int(2e8)n_unique = int(1e6)#创建随机IDids = np.sum(np.random.choice(np.array(list(chars)).astype(object),size = [n_unique,32]),轴= 1)outputdir = os.path.join('/tmp','testdata')os.makedirs(outputdir,exist_ok = True)#从ID样本创建100个实木复合地板文件对于我在范围(100)中:df = pd.DataFrame(np.random.choice(ids,n_total//100),列= ['id'])df.to_parquet(os.path.join(outputdir,f'test- {str(i).zfill(3)}.snappy.parq'),compression ='snappy') 

尝试解决

让我们假设我的机器只有 8GB 的内存.根据Wes Kinney的

有了更多的分区,将会有更多的步骤,但是很明显,当每个分区尝试发送相当大的数据时,都有潜在的瓶颈.对于高维列,此数据可能会非常大,因此,如果每个工作程序都试图发送要求对象为100MB的结果,则接收工作程序的内存将必须具有5倍的内存才能接受数据(这可能会减少进一步计算价值之后.

另外要考虑的是,单个工人在给定时间可以运行多少个任务.

您现在可以看到(对于此数量的分区),工作线程所需的最大内存是数据集最大大小的3倍.因此,根据您的工作程序内存设置,您可能希望将 split_every 设置为一个较低的值(大约2,3,4).

通常,变量越独特,具有唯一计数的每个分区的对象就需要更多的内存,因此 split_every 的值越低,对最大内存使用量.如果变量不是唯一的,则每个分区的唯一计数将是一个小对象,因此不需要 split_every 限制.

tl;dr

I want to

dd.read_parquet('*.parq')['column'].nunique().compute()

but I get

WARNING - Worker exceeded 95% memory budget. Restarting

a couple of times before the workers get killed altogether.


Long version

I have a dataset with

  • 10 billion rows,
  • ~20 columns,

and a single machine with around 200GB memory. I am trying to use dask's LocalCluster to process the data, but my workers quickly exceed their memory budget and get killed even if I use a reasonably small subset and try using basic operations.

I have recreated a toy problem demonstrating the issue below.

Synthetic data

To approximate the problem above on a smaller scale, I will create a single column with 32-character ids with

  • a million unique ids
  • total length of 200 million rows
  • split into 100 parquet files

The result will be

  • 100 files, 66MB each, taking 178MB when loaded as a Pandas dataframe (estimated by df.memory_usage(deep=True).sum())
  • If loaded as a pandas dataframe, all the data take 20GB in memory
  • A single Series with all ids (which is what I assume the workers also have to keep in memory when computing nunique) takes about 90MB

import string
import os

import numpy as np
import pandas as pd

chars = string.ascii_letters + string.digits

n_total = int(2e8)
n_unique = int(1e6)

# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)

outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)

# Sample from the ids to create 100 parquet files
for i in range(100):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')

Attempt at a solution

Let's assume that my machine only has 8GB of memory. Since the partitions take about 178MB and the result 90MB, according to Wes Kinney's rule of thumb, I might need up to 2-3Gb of memory. Therefore, either

  • n_workers=2, memory_limit='4GB', or
  • n_workers_1, memroy_limit='8GB'

seems like a good choice. Sadly, when I try it, I get

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

a couple of times, before the worker(s) get killed altogether.

import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)

dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()

In fact, it seems, for example, with 4 workers, they each need 6GB of memory before being able to perform the task.

Can this situation be improved?

解决方案

That's a great example of a recurring problem. The only shocking thing is that delayed was not used during the synthetic data creation:

import dask
@dask.delayed
def create_sample(i):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')    
    return

# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)

For the following answer I will actually just use a small number of partitions (so change to range(5)), to have sane visualizations. Let's start with the loading:

df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5

This is a minor point, but having use_cols=['id'] in .read_parquet(), exploits the parquet advantage of columnar extraction (it might be that dask will do some optimization behind the scenes, but if you know the columns you want, there's no harm in being explicit).

Now, when you run df['id'].nunique(), here's the DAG that dask will compute:

With more partitions, there would be more steps, but it's apparent that there's a potential bottleneck when each partition is trying to send data that is quite large. This data can be very large for high-dimensional columns, so if each worker is trying to send a result that requires object that is 100MB, then the receiving worker will have to have 5 times the memory to accept the data (which could potentially decrease after further value-counting).

Additional consideration is how many tasks a single worker can run at a given time. The easiest way to control how many tasks can run at the same time on a given worker is resources. If you initiate the cluster with resources:

cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})

Then every worker has the specified resources (in this case it's 1 unit of arbitrary foo), so if you think that processing a single partition should happen one at a time (due to high memory footprint), then you can do:

# note, no split_every is needed in this case since we're just 
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})

This will ensure that any single worker is busy with 1 task at a time, preventing excessive memory usage. (side note: there's also .nunique_approx(), which may be of interest)

To control the amount of data that any given worker receives for further processing, one approach is to use split_every option. Here's what the DAG will look like with split_every=3:

You can see that now (for this number of partitions), the max memory that a worker will need is 3 times that max size of the dataset. So depending on your worker memory settings you might want to set split_every to a low value (2,3,4 or so).

In general, the more unique the variable, the more memory is needed for each partition's object with unique counts, and so a lower value of split_every is going to be useful to put a cap on the max memory usage. If the variable is not very unique, then each individual partition's unique count will be a small object, so there's no need to have a split_every restriction.

这篇关于Dask Dataframe非唯一操作:工人内存不足(MRE)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆