Spark __getnewargs__错误 [英] Spark __getnewargs__ error

查看:121
本文介绍了Spark __getnewargs__错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过将Spark DataFrame映射到RDD然后再回到DataFrame来清理Spark DataFrame.这是一个玩具示例:

I am trying to clean a Spark DataFrame by mapping it to RDD then back to DataFrame. Here's a toy example:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in ex.columns]))
    .toDF(schema=ex.schema))

运行上面的代码将导致Py4JError,其堆栈跟踪非常长,结尾为:

Running the code above results in a Py4JError with a very long stack trace ending in the following:

Py4JError: An error occurred while calling o801.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

这是怎么回事?我如何解决它?我正在使用PySpark 1.5.2.

What is going on here? How do I fix it? I am using PySpark 1.5.2.

推荐答案

该错误是由.map(lambda...)语句中对ex.columns的引用引起的.您不能在RDD转换中使用的函数内引用RDD. Spark是应该在这种情况下发出更多有用的错误,但显然没有.不要把它变成这个版本.

The error is caused by the reference to ex.columns in the .map(lambda...) statement. You can't have references to an RDD inside the function being used in an RDD transformation. Spark is supposed to issue more helpful errors in this case, but apparently that didn't make it into this version.

解决方案是用引用变量的副本替换引用:

Solution is to replace references with copies of the referenced variables:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
cols = copy.deepcopy(ex.columns)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in cols]))
    .toDF(schema=ex.schema))

这篇关于Spark __getnewargs__错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆