简单的dask map_partitions示例 [英] simple dask map_partitions example
问题描述
我阅读了以下 thead 现在正在试图了解它.这是我的示例:
I read the following SO thead and now am trying to understand it. Here is my example:
import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random
df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })
def test_f(col_1, col_2):
return col_1*col_2
ddf = dd.from_pandas(df, npartitions=8)
ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
它在下面产生以下错误.我究竟做错了什么?我也不清楚如何在map_partitions
中传递附加参数来起作用?
It generates the following error below. What am I doing wrong? Also I am not clear how to pass additional parameters to function in map_partitions
?
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
136 try:
--> 137 yield
138 except Exception as e:
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132
TypeError: test_f() got an unexpected keyword argument 'columns'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-9-913789c7326c> in <module>()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs)
469 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP
470 """
--> 471 return map_partitions(func, self, *args, **kwargs)
472
473 @insert_meta_param_description(pad=12)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs)
3163
3164 if meta is no_default:
-> 3165 meta = _emulate(func, *args, **kwargs)
3166
3167 if all(isinstance(arg, Scalar) for arg in args):
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3129 """
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132
3133
~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback)
75 value = type()
76 try:
---> 77 self.gen.throw(type, value, traceback)
78 except StopIteration as exc:
79 # Suppress StopIteration *unless* it's the same exception that
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
148 ).format(" in `{0}`".format(funcname) if funcname else "",
149 repr(e), tb)
--> 150 raise ValueError(msg)
151
152
ValueError: Metadata inference failed in `test_f`.
Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)
Traceback:
---------
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error
yield
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
推荐答案
There is an example in map_partitions
docs to achieve exactly what are trying to do:
ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
当您调用map_partitions
时(就像在pandas.DataFrame
上调用.apply()
时一样),您尝试map
(或apply
)的函数将被赋予数据帧作为第一个参数.
When you call map_partitions
(just like when you call .apply()
on pandas.DataFrame
), the function that you try to map
(or apply
) will be given dataframe as a first argument.
如果是dask.dataframe.map_partitions
,则第一个参数将是一个分区,如果是pandas.DataFrame.apply
,则是整个数据帧.
In case of dask.dataframe.map_partitions
this first argument will be a partition and in case of pandas.DataFrame.apply
- a whole dataframe.
这意味着您的函数必须接受dataframe(partition)作为第一个参数,并且在您的情况下可能看起来像这样:
Which means that your function has to accept dataframe(partition) as a first argument and and in your case could look like this:
def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])
请注意,在这种情况下,在调用.compute()
之前会发生新列的分配(即按计划进行).
Note that assignment of a new column in this case happens (i.e. gets scheduled to happen) BEFORE you call .compute()
.
在您的示例中,在调用.compute()
之后分配列,这违背了使用dask的目的. IE.调用.compute()
后,该操作的结果将加载到内存中如果有足够的空间用于这些结果(否则,您只会得到MemoryError
).
In your example you assign column AFTER you call .compute()
, which kind of defeats the purpose of using dask. I.e. after you call .compute()
the results of that operation are loaded into memory if there is enough space for those results (if not you just get MemoryError
).
为使您的示例发挥作用,您可以:
So for you example to work you could:
1)使用函数(以列名作为参数):
1) Use function (with column names as arguments):
def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])
ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')
# Here is good place to do something with BIG ddf_out dataframe before calling .compute()
result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
2)使用lambda
(列名在函数中进行了硬编码):
2) Use lambda
(with column names hardcoded in the function):
ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))
# Here is good place to do something with BIG ddf_out dataframe before calling .compute()
result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
更新:
要逐行应用功能,以下是您链接的帖子的引文:
To apply function on a row-by-row basis, here is a quote from the post you linked:
map
/apply
map
/apply
您可以使用map
df.mycolumn.map(func)
您可以使用apply
df.apply(func, axis=1)
即对于您问题中的示例函数,可能看起来像这样:
I.e. for the example function in your question, it might look like this:
def test_f(dds, col_1, col_2):
return dds[col_1] * dds[col_2]
由于您将逐行应用该函数,因此该函数的第一个参数将是一个系列(即,数据框的每一行都是一个系列).
Since you will be applying it on a row-by-row basis the function's first argument will be a series (i.e. each row of a dataframe is a series).
要应用此功能,则可以这样命名:
To apply this function then you might call it like this:
dds_out = ddf.apply(
test_f,
args=('col_1', 'col_2'),
axis=1,
meta=('result', int)
).compute(get=get)
这将返回一个名为'result'
的系列.
This will return a series named 'result'
.
我猜你也可以在每个分区上使用函数调用.apply
,但是看起来比直接在数据帧上调用.apply
效率更高.但是也许您的测试会证明事实并非如此.
I guess you could also call .apply
on each partition with a function but it does not look to be any more efficient then calling .apply
on dataframe directly. But may be your tests will prove otherwise.
这篇关于简单的dask map_partitions示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!