PySpark:在RDD中使用对象 [英] PySpark: Using Object in RDD

查看:243
本文介绍了PySpark:在RDD中使用对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在学习Python,并希望将其应用于Spark. 我有一个非常简单(且无用)的脚本:

I am currently learning Python and want to apply it on/with Spark. I have this very simple (and useless) script:

import sys
from pyspark import SparkContext

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)

    def getValue(self):
        return self.v

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
    print(reduzed.collect())

使用

火花提交CustomClass.py

spark-submit CustomClass.py

..以下错误提示(输出缩短):

..the following error is thorwn (output shortened):

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...

对我来说,声明

PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed

似乎很重要.这意味着类实例不能被序列化,对吗? 你知道如何解决这个问题吗?

seems to be important. It means that the class instances can't be serialized, right? Do you know how to solve this issue?

感谢和问候

推荐答案

存在许多问题:

  • 如果将MyClass放在单独的文件中,则可以将其腌制.对于许多Python使用pickle来说,这是一个普遍的问题.通过移动MyClass并使用from myclass import MyClass可以很容易地解决此问题.通常,dill可以解决这些问题(如import dill as pickle所示),但在这里对我不起作用.
  • 此问题解决后,由于调用addValue return None(不返回),而不是MyClass的实例,所以减少操作不起作用.您需要更改addValue以返回self.
  • 最后,lambda需要调用getValue,因此应该具有a.addValue(b.getValue())
  • If you put MyClass in a separate file it can be pickled. This is a common problem for many Python uses of pickle. This is simple to solve by moving MyClass and the using from myclass import MyClass. Normally dill can fix these issues (as in import dill as pickle), but it didn't work for me here.
  • Once this is solved, your reduce doesn't work since calling addValue return None (no return), not an instance of MyClass. You need to change addValue to return self.
  • Finally, the lambda need to call getValue, so should have a.addValue(b.getValue())

一起: myclass.py

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)
        return self

    def getValue(self):
        return self.v

main.py

import sys
from pyspark import SparkContext
from myclass import MyClass

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
    print(reduzed.collect())

这篇关于PySpark:在RDD中使用对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆