Spark __getnewargs__错误 [英] Spark __getnewargs__ error
问题描述
我正在尝试通过将Spark DataFrame映射到RDD然后再回到DataFrame来清理Spark DataFrame.这是一个玩具示例:
I am trying to clean a Spark DataFrame by mapping it to RDD then back to DataFrame. Here's a toy example:
def replace_values(row,sub_rules):
d = row.asDict()
for col,old_val,new_val in sub_rules:
if d[col] == old_val:
d[col] = new_val
return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in ex.columns]))
.toDF(schema=ex.schema))
运行上面的代码将导致Py4JError
,其堆栈跟踪非常长,结尾为:
Running the code above results in a Py4JError
with a very long stack trace ending in the following:
Py4JError: An error occurred while calling o801.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
这是怎么回事?我如何解决它?我正在使用PySpark 1.5.2.
What is going on here? How do I fix it? I am using PySpark 1.5.2.
推荐答案
该错误是由.map(lambda...)
语句中对ex.columns
的引用引起的.您不能在RDD转换中使用的函数内引用RDD. Spark是应该在这种情况下发出更多有用的错误,但显然没有.不要把它变成这个版本.
The error is caused by the reference to ex.columns
in the .map(lambda...)
statement. You can't have references to an RDD inside the function being used in an RDD transformation. Spark is supposed to issue more helpful errors in this case, but apparently that didn't make it into this version.
解决方案是用引用变量的副本替换引用:
Solution is to replace references with copies of the referenced variables:
def replace_values(row,sub_rules):
d = row.asDict()
for col,old_val,new_val in sub_rules:
if d[col] == old_val:
d[col] = new_val
return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
cols = copy.deepcopy(ex.columns)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in cols]))
.toDF(schema=ex.schema))
这篇关于Spark __getnewargs__错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!