达斯克(Dask):为什么内存使用量激增? [英] Dask: why is memory usage blowing up?

查看:48
本文介绍了达斯克(Dask):为什么内存使用量激增?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个很小的数据帧(大约100MB),并且我想为每一行执行一个昂贵的计算.它不是可向量化的计算;它需要对每一行进行一些解析和数据库查找.

I have a small dataframe (about ~100MB) and an expensive computation that I want to perform for each row. It is not a vectorizable computation; it requires some parsing and a DB lookup for each row.

因此,我决定尝试使用Dask并行处理任务.该任务是令人尴尬的并行"任务.执行顺序或重复执行是没有问题的.但是,由于某些未知的原因,内存使用量可能会激增至约100GB.

As such, I have decided to try Dask to parallelize the task. The task is "embarrassingly parallel" and order of execution or repeated execution is no issue. However, for some unknown reason, memory usage blows up to about ~100GB.

这是有问题的代码示例:

Here is the offending code sample:

import pandas as pd
import numpy as np
import dask.dataframe as dd

from dask.distributed import Client
from dask_jobqueue import LSFCluster

cluster = LSFCluster(memory="6GB", cores=1, project='gRNA Library Design')
cluster.scale(jobs=16)
client = Client(cluster)

required_dict = load_big_dict()
score_guide = lambda row: expensive_computation(required_dict, row)

library_df = pd.read_csv(args.library_csv)

meta = library_df.dtypes
meta = meta.append(pd.Series({
    'specificity': np.dtype('int64'),
    'cutting_efficiency': np.dtype('int64'), 
    '0 Off-targets': np.dtype('object'),
    '1 Off-targets': np.dtype('object'),
    '2 Off-targets': np.dtype('object'),
    '3 Off-targets': np.dtype('object')}))
    
library_ddf = dd.from_pandas(library_df, npartitions=32)
library_ddf = library_ddf.apply(score_guide, axis=1, meta=meta)
library_ddf = library_ddf.compute()
library_ddf = library_ddf.drop_duplicates()
library_ddf.to_csv(args.outfile, index=False)

我的猜测是,查找所需的大词典在某种程度上是问题所在,但其总大小仅为〜1.5GB,并且不包含在结果数据框中.

My guess is that somehow the big dictionary required for lookup is the issue, but its size is only ~1.5GB in total and is not included in the resultant dataframe.

为什么Dask会消耗内存?

Why might Dask be blowing up memory usage?

推荐答案

问题是 required_dict 需要序列化并发送到所有辅助线程.由于 required_dict 很大,并且许多工作人员同时需要它,因此重复的序列化会导致大量的内存崩溃.

The problem is that the required_dict needs to be serialized and sent to all the worker threads. As required_dict is large and many workers need it simultaneously, repeated serializations cause a massive memory blowup.

有很多修复程序;对我来说,最简单的方法是简单地从工作线程中加载字典并显式使用 map_partitions 而不是 apply .

There are many fixes; for me it was easiest to simply load the dictionary from the worker threads and explicitly use map_partitions instead of apply.

这是代码中的解决方案,

Here is the solution in code,

    def do_df(df):
        required_dict = load_big_dict()
        score_guide = lambda row: expensive_computation(required_dict, row)
        return df.apply(score_guide, axis=1)
        
    library_ddf = dd.from_pandas(library_df, npartitions=128)
    library_ddf = library_ddf.map_partitions(do_df)
    library_ddf = library_ddf.compute()

这篇关于达斯克(Dask):为什么内存使用量激增?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆