PySpark:在RDD中使用对象 [英] PySpark: Using Object in RDD
本文介绍了PySpark:在RDD中使用对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我目前正在学习Python,并希望将其应用于Spark. 我有一个非常简单(且无用)的脚本:
I am currently learning Python and want to apply it on/with Spark. I have this very simple (and useless) script:
import sys
from pyspark import SparkContext
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
def getValue(self):
return self.v
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
print(reduzed.collect())
使用
火花提交CustomClass.py
spark-submit CustomClass.py
..以下错误提示(输出缩短):
..the following error is thorwn (output shortened):
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...
对我来说,声明
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
似乎很重要.这意味着类实例不能被序列化,对吗? 你知道如何解决这个问题吗?
seems to be important. It means that the class instances can't be serialized, right? Do you know how to solve this issue?
感谢和问候
推荐答案
存在许多问题:
- 如果将
MyClass
放在单独的文件中,则可以将其腌制.对于许多Python使用pickle来说,这是一个普遍的问题.通过移动MyClass
并使用from myclass import MyClass
可以很容易地解决此问题.通常,dill
可以解决这些问题(如import dill as pickle
所示),但在这里对我不起作用. - 此问题解决后,由于调用
addValue
returnNone
(不返回),而不是MyClass
的实例,所以减少操作不起作用.您需要更改addValue
以返回self
. - 最后,
lambda
需要调用getValue
,因此应该具有a.addValue(b.getValue())
- If you put
MyClass
in a separate file it can be pickled. This is a common problem for many Python uses of pickle. This is simple to solve by movingMyClass
and the usingfrom myclass import MyClass
. Normallydill
can fix these issues (as inimport dill as pickle
), but it didn't work for me here. - Once this is solved, your reduce doesn't work since calling
addValue
returnNone
(no return), not an instance ofMyClass
. You need to changeaddValue
to returnself
. - Finally, the
lambda
need to callgetValue
, so should havea.addValue(b.getValue())
一起:
myclass.py
class MyClass:
def __init__(self, value):
self.v = str(value)
def addValue(self, value):
self.v += str(value)
return self
def getValue(self):
return self.v
main.py
import sys
from pyspark import SparkContext
from myclass import MyClass
if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)
data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
print(reduzed.collect())
这篇关于PySpark:在RDD中使用对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文