如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化) [英] How to make custom object available for function passed to dask df.apply (cannot serialize)

查看:15
本文介绍了如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所有这些代码都可以在 pandas 中运行,但单线程运行速度很慢。

我有一个创建速度很慢的对象(它是Bloom Filter)。

我的Dask代码类似于:

def has_match(row, my_filter):
    return my_filter.matches(
        a=row.a, b =row.b
    )

# ....make dask dataframe ddf

ddf['match'] = ddf.apply(has_match, args=(my_filter, ), axis=1, meta=(bool))
ddf.compute()

当我尝试运行此命令时,出现以下错误:

distributed.protocol.core - CRITICAL - Failed to Serialize

我的对象是从C库创建的,所以它不能自动序列化并不奇怪,但我不知道如何解决这个问题。

推荐答案

分布式要求所有中间结果都是可序列化的。在您的例子中,您有一个没有实现Pickle的对象。一般来说,您有几个选项(按从好到差的顺序):

  • 为此对象实现Pickle。请注意,使用copyreg模块可以为不在您控制范围内的类添加Pickle支持。

  • 在函数中手动缓存筛选器的创建。您可以使用对象或模块中的全局变量来执行此操作。请注意,下面的代码需要是导入的模块的一部分,而不是交互会话的一部分(即,不是在jupyter笔记本/IPython会话中)。

例如(未测试):

myfilter = None


def get_or_load():
    global myfilter
    if myfilter is None:
        myfilter = load_filter()
    else:
        return myfilter


def load_filter():
    pass


def has_match(row):
    my_filter = get_or_load()
    return my_filter.matches(a=row.a, b=row.b)

然后在您的用户代码中:

from my_filter_utils import has_match

ddf['match'] = ddf.apply(has_match, axis=1, meta=('matches', bool))
  • 使用DASK管理缓存。为此,请将对象包装在另一个类中,该类在序列化时重新加载该对象。如果您随后将该对象持久化到群集中,则Dask会保留该对象,且最多在每个节点上调用一次创建函数。

例如(未测试):

from dask import delayed

class Wrapper(object):
    def __init__(self, func):
        self.func = func
        self.filter = func()

    def __reduce__(self):
        # When unpickled, the filter will be reloaded
        return (Wrapper, (func,))


def load_filter():
    pass


# Create a delayed function to load the filter
wrapper = delayed(Wrapper)(load_filter)

# Optionally persist the wrapper in the cluster, to be reused over multiple computations
wrapper = wrapper.persist()

def has_match(row, wrapper):
    return wrapper.filter.matches(a=row.a, b=row.b)


ddf['match'] = ddf.apply(has_match, args=(wrapper,), axis=1, meta=('matches', bool))

这篇关于如何使自定义对象可用于传递给Daskdf.Apply的函数(无法序列化)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆