如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化) [英] How to make custom object available for function passed to dask df.apply (cannot serialize)
本文介绍了如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
所有这些代码都可以在 pandas 中运行,但单线程运行速度很慢。
我有一个创建速度很慢的对象(它是Bloom Filter)。
我的Dask代码类似于:
def has_match(row, my_filter):
return my_filter.matches(
a=row.a, b =row.b
)
# ....make dask dataframe ddf
ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()
当我尝试运行此命令时,出现以下错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
我的对象是从C库创建的,所以它不能自动序列化并不奇怪,但我不知道如何解决这个问题。
推荐答案
分布式要求所有中间结果都是可序列化的。在您的例子中,您有一个没有实现Pickle的对象。一般来说,您有几个选项(按从好到差的顺序):
为此对象实现Pickle。请注意,使用copyreg模块可以为不在您控制范围内的类添加Pickle支持。
在函数中手动缓存筛选器的创建。您可以使用对象或模块中的全局变量来执行此操作。请注意,下面的代码需要是导入的模块的一部分,而不是交互会话的一部分(即,不是在jupyter笔记本/IPython会话中)。
例如(未测试):
myfilter = None
def get_or_load():
global myfilter
if myfilter is None:
myfilter = load_filter()
else:
return myfilter
def load_filter():
pass
def has_match(row):
my_filter = get_or_load()
return my_filter.matches(a=row.a, b=row.b)
然后在您的用户代码中:
from my_filter_utils import has_match
ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
- 使用DASK管理缓存。为此,请将对象包装在另一个类中,该类在序列化时重新加载该对象。如果您随后将该对象持久化到群集中,则Dask会保留该对象,且最多在每个节点上调用一次创建函数。
例如(未测试):
from dask import delayed
class Wrapper(object):
def __init__(self, func):
self.func = func
self.filter = func()
def __reduce__(self):
# When unpickled, the filter will be reloaded
return (Wrapper, (func,))
def load_filter():
pass
# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)
# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()
def has_match(row, wrapper):
return wrapper.filter.matches(a=row.a, b=row.b)
ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))
这篇关于如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文