如何使用 Python 类处理 RDD? [英] How to process RDDs using a Python class?

查看:17
本文介绍了如何使用 Python 类处理 RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Spark 中实现一个模型作为 python 类,并且任何时候我尝试将类方法映射到 RDD 时它都会失败.我的实际代码更复杂,但这个简化版本是问题的核心:

I'm implementing a model in Spark as a python class, and any time I try to map a class method to an RDD it fails. My actual code is more complicated, but this simplified version gets at the heart of the problem:

class model(object):
    def __init__(self):
        self.data = sc.textFile('path/to/data.csv')
        # other misc setup
    def run_model(self):
        self.data = self.data.map(self.transformation_function)
    def transformation_function(self,row):
        row = row.split(',')
        return row[0]+row[1]

现在,如果我像这样运行模型(例如):

Now, if I run the model like so (for example):

test = model()
test.run_model()
test.data.take(10)

我收到以下错误:

异常:您似乎试图从广播变量、操作或转换中引用 SparkContext.SparkContext 只能在驱动程序上使用,不能在它运行在工作线程上的代码中使用.有关详细信息,请参阅 SPARK-5063.

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

我对此进行了一些尝试,并且在我尝试将类方法映射到类中的 RDD 时,它似乎总是可靠地发生.我已经确认,如果我在类结构之外实现,映射函数可以正常工作,所以问题肯定与类有关.有没有办法解决这个问题?

I've played with this a bit, and it seems to reliably occur anytime I try to map a class method to an RDD within the class. I have confirmed that the mapped function works fine if I implement outside of a class structure, so the problem definitely has to do with the class. Is there a way to resolve this?

推荐答案

这里的问题比使用嵌套 RDD 或在转换中执行 Spark 操作要微妙一点.Spark 不允许访问 SparkContext 内部操作或转换.

Problem here is a little bit more subtle than using nested RDDs or performing Spark actions inside of transformations. Spark doesn't allow access to the SparkContext inside action or transformation.

即使你没有明确访问它,它也会在闭包内被引用,并且必须被序列化并随身携带.这意味着您的 transformation 方法引用了 self,同时也保留了 SparkContext,因此会出现错误.

Even you don't access it explicitly it is referenced inside the closure and has to be serialized and carried around. It means that your transformation method, which references self, keeps SparkContext as well, hence the error.

处理这种情况的一种方法是使用静态方法:

One way to handle this is to use static method:

class model(object):
    @staticmethod
    def transformation_function(row):
        row = row.split(',')
        return row[0]+row[1]

    def __init__(self):
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function)

编辑:

如果你想能够访问实例变量,你可以尝试这样的事情:

If you want to be able to access instance variables you can try something like this:

class model(object):
    @staticmethod
    def transformation_function(a_model):
        delim = a_model.delim
        def _transformation_function(row):
            return row.split(delim)
        return _transformation_function

    def __init__(self):
        self.delim = ','
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function(self))

这篇关于如何使用 Python 类处理 RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆