如何用大字典映射Dask系列 [英] How to map a dask Series with a large dict

查看:77
本文介绍了如何用大字典映射Dask系列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试找出使用大型映射映射dask系列的最佳方法.直接的series.map(large_mapping)发出UserWarning: Large object of size <X> MB detected in task graph并建议使用client.scatterclient.submit,但是后者不能解决问题,实际上它要慢得多.在client.scatter中尝试broadcast=True也无济于事.

I'm trying to figure out the best way to map a dask Series with a large mapping. The straightforward series.map(large_mapping) issues UserWarning: Large object of size <X> MB detected in task graph and suggests using client.scatter and client.submit but the latter doesn't solve the problem and in fact it's much slower. Trying broadcast=True in client.scatter doesn't help either.

import argparse
import distributed
import dask.dataframe as dd

import numpy as np
import pandas as pd


def compute(s_size, m_size, npartitions, scatter, broadcast, missing_percent=0.1, seed=1):
    np.random.seed(seed)
    mapping = dict(zip(np.arange(m_size), np.random.random(size=m_size)))
    ps = pd.Series(np.random.randint((1 + missing_percent) * m_size, size=s_size))
    ds = dd.from_pandas(ps, npartitions=npartitions)
    if scatter:
        mapping_futures = client.scatter(mapping, broadcast=broadcast)
        future = client.submit(ds.map, mapping_futures)
        return future.result()
    else:
        return ds.map(mapping)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', default=200000, type=int, help='series size')
    parser.add_argument('-m', default=50000, type=int, help='mapping size')
    parser.add_argument('-p', default=10, type=int, help='partitions number')
    parser.add_argument('--scatter', action='store_true', help='Scatter mapping')
    parser.add_argument('--broadcast', action='store_true', help='Broadcast mapping')
    args = parser.parse_args()

    client = distributed.Client()
    ds = compute(args.s, args.m, args.p, args.scatter, args.broadcast)
    print(ds.compute().describe())

推荐答案

您的问题在这里

In [4]: mapping = dict(zip(np.arange(50000), np.random.random(size=50000)))

In [5]: import pickle

In [6]: %time len(pickle.dumps(mapping))
CPU times: user 2.24 s, sys: 18.6 ms, total: 2.26 s
Wall time: 2.25 s
Out[6]: 6268809

因此mapping很大且没有分区-在这种情况下,分散操作是给您带来麻烦的一种操作.

So mapping is big and unpartitioned - the scatter operation is the one giving you the problem in this case.

考虑替代方案

def make_mapping():
    return dict(zip(np.arange(50000), np.random.random(size=50000)))

mapping = client.submit(make_mapping)  # ships the function, not the data
                                       # and requires no serialisation
future = client.submit(ds.map, mapping)

这不会显示警告.但是,在这里使用字典进行映射对我来说似乎很奇怪,一系列直线数组似乎可以更好地编码数据的性质.

This will not show the warning. However, it seems strange to me to use a dictionary here to do the mapping, a series of straight array seems to encode the nature of the data better.

这篇关于如何用大字典映射Dask系列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆