星火RuntimeError:未初始化类方法对象 [英] Spark RuntimeError: uninitialized classmethod object

查看:269
本文介绍了星火RuntimeError:未初始化类方法对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Python写了一个很简单的星火code:

I wrote a very simple Spark code in Python:

import collections
Person = collections.namedtuple('Person', ['name', 'age', 'gender'])

a = sc.parallelize([['Barack Obama', 54, 'M'], ['Joe Biden', 74, 'M']])
a = a.map(lambda row: Person(*row))

print a.collect()

def func(row):
    tmp = row._replace(name='Jack Rabbit')
    return tmp

print a.map(func).collect()

我得到以下输出和错误:

I get following output and error:

[Person(name='Barack Obama', age=29, gender='M'), Person(name='Joe Biden', age=57, gender='M')]

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in stage 11.0 failed 4 times, most recent failure: Lost task 49.3 in stage 11.0 (TID 618, 172.19.75.121): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/etc/spark-1.4.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-19-f0b4885784cb>", line 2, in func
  File "<string>", line 32, in _replace
RuntimeError: uninitialized classmethod object

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

但是,如果我运行下面的一块code,我没有得到任何错误:

However, if I run the following piece of code, I don't get any error:

for row in a.collect():
  func(row)

怎么办?

推荐答案

为什么不起作用?因为 namedtuple 调用创建一个类,并在星火类不会序列化为封闭的一部分。这意味着你必须创建一个单独的模块*,并确保它是可用的工人:

Why it doesn't work? Because namedtuple call creates a class and classes in Spark are not serialized as a part of the closure. It means you have to create a separate module* and make sure it is available on the workers:

txt = "\n".join(["import collections",
    "Person = collections.namedtuple('Person', ['name', 'age', 'gender'])"])

with open("persons.py", "w") as fw:
    fw.write(txt)

sc.addPyFile("persons.py")  # Ship module to the worker nodes

接下来,您可以简单地导入和一切都应该按预期工作:

Next you can simply import and everything should work as expected:

import persons

a.map(func).collect()

在一个侧面说明前导下划线是有原因的。

On a side note leading underscore is there for a reason.

*它可以动态完成这样的 a.map(拉姆达行:collections.namedtuple('人',['名','年龄','性别'])(*行) )通过定义 mapPartitions ,但既不是pretty或有效率或

* It could be done dynamically like this a.map(lambda row: collections.namedtuple('Person', ['name', 'age', 'gender'])(*row)) or by defining Person inside mapPartitions but it is neither pretty or efficient.

这篇关于星火RuntimeError:未初始化类方法对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆