如何加载每个Spark执行程序无法腌制的数据? [英] How can I load data that can't be pickled in each Spark executor?

查看:74
本文介绍了如何加载每个Spark执行程序无法腌制的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用用Cython编写的 NoAho库.其内部特里无法腌制:如果将其加载到主节点上,我将永远无法得到与在worker中执行的操作匹配.

I'm using the NoAho library which is written in Cython. Its internal trie cannot be pickled: if I load it on the master node, I never get matches for operations that execute in workers.

由于我想在每个Spark执行器中使用相同的trie,因此受此

Since I would like to use the same trie in each Spark executor, I found a way to load the trie lazily, inspired by this spaCy on Spark issue.

global trie

def get_match(text):
    # 1. Load trie if needed
    global trie
    try:
        trie
    except NameError:
        from noaho import NoAho

        trie = NoAho()
        trie.add(key_text='ms windows', payload='Windows 2000')
        trie.add(key_text='ms windows 2000', payload='Windows 2000')
        trie.add(key_text='windows 2k', payload='Windows 2000')
        ...

    # 2. Find an actual match to get they payload back
    return trie.findall_long(text)

虽然可行,但每个Spark作业均执行所有.add()调用,这大约需要一分钟.由于我不确定"Spark job"是否正确,因此我将更加明确:我在Jupyter笔记本中使用Spark,并且每次运行需要get_match()函数的单元时,都不会缓存特里并花费一分钟的时间来加载尝试,而这主要花费了运行时间.

While this works, all .add() calls are performed for every Spark job, which takes around one minute. Since I'm not sure "Spark job" is the correct term, I'll be more explicit: I use Spark in a Jupyter notebook, and every time I run a cell that needs the get_match() function, the trie is never cached and takes one minute to load the tries, which dominates the run time.

我可以做些什么来确保缓存特里吗?还是有更好的解决方案来解决我的问题?

Is there anything I can do to ensure the trie gets cached? Or is there a better solution to my problem?

推荐答案

您可以尝试的一件事是使用单例模块加载并初始化trie.基本上,您需要的是一个单独的模块,其中包含以下内容:

One thing you can try is to use a singleton module to load and initialize the trie. Basically all you need is a separate module with something like this:

  • trie_loader.py

from noaho import NoAho

def load():
    trie = NoAho()
    trie.add('ms windows', 'Windows 2000')
    trie.add('ms windows 2000', 'Windows 2000')
    trie.add('windows 2k', 'Windows 2000')
    return trie

trie  = load()

并使用标准的Spark工具分发它:

and distribute this using standard Spark tools:

sc.addPyFile("trie_loader.py")
import trie_loader

rdd = sc.parallelize(["ms windows", "Debian GNU/Linux"])
rdd.map(lambda x: (x, trie_loader.trie.find_long(x))).collect()
## [('ms windows', (0, 10, 'Windows 2000')),
##  ('Debian GNU/Linux', (None, None, None))]

这应该在每次启动Python进程执行程序时加载所需的数据,而不是在访问数据时加载它.我不确定它是否可以帮上忙,但是值得一试.

This should load required data every time Python process executor is started instead of loading it when data is accessed. I am not sure if it can help here but it is worth a try.

这篇关于如何加载每个Spark执行程序无法腌制的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆