简单的 dask map_partitions 示例 [英] simple dask map_partitions example

查看:26
本文介绍了简单的 dask map_partitions 示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了以下内容 thead 和现在我试图理解它.这是我的例子:

I read the following SO thead and now am trying to understand it. Here is my example:

import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random

df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })

def test_f(col_1, col_2):
    return col_1*col_2

ddf = dd.from_pandas(df, npartitions=8)

ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

它会在下面生成以下错误.我究竟做错了什么?另外我不清楚如何在 map_partitions 中传递额外的参数来运行?

It generates the following error below. What am I doing wrong? Also I am not clear how to pass additional parameters to function in map_partitions?

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframeutils.py in raise_on_meta_error(funcname)
    136     try:
--> 137         yield
    138     except Exception as e:

~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframecore.py in _emulate(func, *args, **kwargs)
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 

TypeError: test_f() got an unexpected keyword argument 'columns'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-9-913789c7326c> in <module>()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframecore.py in map_partitions(self, func, *args, **kwargs)
    469         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    470         """
--> 471         return map_partitions(func, self, *args, **kwargs)
    472 
    473     @insert_meta_param_description(pad=12)

~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframecore.py in map_partitions(func, *args, **kwargs)
   3163 
   3164     if meta is no_default:
-> 3165         meta = _emulate(func, *args, **kwargs)
   3166 
   3167     if all(isinstance(arg, Scalar) for arg in args):

~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframecore.py in _emulate(func, *args, **kwargs)
   3129     """
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 
   3133 

~AppDataLocalcondacondaenvs	ensorflowlibcontextlib.py in __exit__(self, type, value, traceback)
     75                 value = type()
     76             try:
---> 77                 self.gen.throw(type, value, traceback)
     78             except StopIteration as exc:
     79                 # Suppress StopIteration *unless* it's the same exception that

~AppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframeutils.py in raise_on_meta_error(funcname)
    148                ).format(" in `{0}`".format(funcname) if funcname else "",
    149                         repr(e), tb)
--> 150         raise ValueError(msg)
    151 
    152 

ValueError: Metadata inference failed in `test_f`.

Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)

Traceback:
---------
  File "C:Userssome_userAppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframeutils.py", line 137, in raise_on_meta_error
    yield
  File "C:Userssome_userAppDataLocalcondacondaenvs	ensorflowlibsite-packagesdaskdataframecore.py", line 3131, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

推荐答案

map_partitions docs 以准确实现正在尝试执行的操作:

There is an example in map_partitions docs to achieve exactly what are trying to do:

ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))

当你调用 map_partitions 时(就像你在 pandas.DataFrame 上调用 .apply() 一样),你试图调用的函数map(或apply)将被赋予数据帧作为第一个参数.

When you call map_partitions (just like when you call .apply() on pandas.DataFrame), the function that you try to map (or apply) will be given dataframe as a first argument.

dask.dataframe.map_partitions 的情况下,第一个参数将是 一个分区,在 pandas.DataFrame.apply 的情况下 - a整个数据框.

In case of dask.dataframe.map_partitions this first argument will be a partition and in case of pandas.DataFrame.apply - a whole dataframe.

这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:

Which means that your function has to accept dataframe(partition) as a first argument and and in your case could look like this:

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])

请注意,在这种情况下,在调用 .compute() 之前,会分配一个新列(即计划发生).

Note that assignment of a new column in this case happens (i.e. gets scheduled to happen) BEFORE you call .compute().

在您的示例中,您在调用 .compute() 后分配列,这违背了使用 dask 的目的.IE.在您调用 .compute() 之后,该操作的结果将被加载到内存中如果有足够的空间来存储这些结果(如果没有,您只会得到 MemoryError).

In your example you assign column AFTER you call .compute(), which kind of defeats the purpose of using dask. I.e. after you call .compute() the results of that operation are loaded into memory if there is enough space for those results (if not you just get MemoryError).

因此,对于您的工作示例,您可以:

So for you example to work you could:

1) 使用函数(以列名作为参数):

1) Use function (with column names as arguments):

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])


ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

2) 使用 lambda(在函数中使用硬编码的列名):

2) Use lambda (with column names hardcoded in the function):

ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

<小时>

更新:

要逐行应用函数,请引用您链接的帖子:

To apply function on a row-by-row basis, here is a quote from the post you linked:

map/apply

您可以使用 map

df.mycolumn.map(func)

您可以使用 apply

df.apply(func, axis=1)

即对于您问题中的示例函数,它可能如下所示:

I.e. for the example function in your question, it might look like this:

def test_f(dds, col_1, col_2):
    return dds[col_1] * dds[col_2]

由于您将逐行应用它,因此函数的第一个参数将是一个系列(即数据框的每一行都是一个系列).

Since you will be applying it on a row-by-row basis the function's first argument will be a series (i.e. each row of a dataframe is a series).

要应用这个函数,你可以这样调用它:

To apply this function then you might call it like this:

dds_out = ddf.apply(
    test_f, 
    args=('col_1', 'col_2'), 
    axis=1, 
    meta=('result', int)
).compute(get=get)

这将返回一个名为 'result' 的系列.

This will return a series named 'result'.

我想你也可以用一个函数在每个分区上调用 .apply ,但它看起来并不比直接在数据帧上调用 .apply 更有效率.但也许你的测试会证明并非如此.

I guess you could also call .apply on each partition with a function but it does not look to be any more efficient then calling .apply on dataframe directly. But may be your tests will prove otherwise.

这篇关于简单的 dask map_partitions 示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆