PySpark 抛出错误方法 __getnewargs__([]) 不存在 [英] PySpark Throwing error Method __getnewargs__([]) does not exist

查看:22
本文介绍了PySpark 抛出错误方法 __getnewargs__([]) 不存在的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组文件.文件的路径保存在一个文件中.例如 all_files.txt.使用 apache spark,我需要对所有文件进行操作并将结果分组.

我想做的步骤是:

  • 通过阅读all_files.txt
  • 创建一个RDD
  • 对于 all_files.txt 中的每一行(每一行都是某个文件的路径),将每个文件的内容读入一个 RDD
  • 然后对所有内容进行操作

这是我为此编写的代码:

def return_contents_from_file (file_name):返回 spark.read.text(file_name).rdd.map(lambda r: r[0])def run_spark():file_name = 'path_to_file'火花 = SparkSession .builder .appName("PythonWordCount") .getOrCreate()counts = spark.read.text(file_name).rdd.map(lambda r: r[0])  # 这一行应该返回每个文件的路径.flatMap(return_contents_from_file)  # 在这里我希望将所有文件的所有内容都放在一起.flatMap(do_operation_on_each_line_of_all_files) # 在这里我期望对所有文件的每一行进行操作

这是抛出错误:

<块引用>

第 323 行,在 get_return_value py4j.protocol.Py4JError 中:一个错误在调用 o25.getnewargs 时发生.跟踪:py4j.Py4JException:方法 getnewargs([]) 在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)在 py4j.Gateway.invoke(Gateway.java:272) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:745)

有人可以告诉我我做错了什么以及我应该如何继续.提前致谢.

解决方案

flatMap 内使用 spark 或任何发生在执行程序上的转换都是不允许的(spark 会话仅在驱动程序上可用).也无法创建 RDD 的 RDD(参见:是否可以在 Apache Spark 中创建嵌套的 RDD?)

但是您可以通过另一种方式实现这种转换 - 将 all_files.txt 的所有内容读入数据帧,使用 local map 来制作它们dataframes 和 local reduce 以联合所有,参见示例:

<预><代码>>>>文件名 = spark.read.text('all_files.txt').collect()>>>dataframes = map(lambda r: spark.read.text(r[0]), 文件名)>>>all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

I have a set of files. The path to the files are saved in a file., say all_files.txt. Using apache spark, I need to do an operation on all the files and club the results.

The steps that I want to do are:

  • Create an RDD by reading all_files.txt
  • For each line in all_files.txt (Each line is a path to some file), read the contents of each of the files into a single RDD
  • Then do an operation all contents

This is the code I wrote for the same:

def return_contents_from_file (file_name):
    return spark.read.text(file_name).rdd.map(lambda  r: r[0])

def run_spark():
    file_name = 'path_to_file'

    spark = SparkSession 
        .builder 
        .appName("PythonWordCount") 
        .getOrCreate()

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0])  # this line is supposed to return the paths to each file
        .flatMap(return_contents_from_file)  # here i am expecting to club all the contents of all files
        .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files

This is throwing the error:

line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.

解决方案

Using spark inside flatMap or any transformation that occures on executors is not allowed (spark session is available on driver only). It is also not possible to create RDD of RDDs (see: Is it possible to create nested RDDs in Apache Spark?)

But you can achieve this transformation in another way - read all content of all_files.txt into dataframe, use local map to make them dataframes and local reduce to union all, see example:

>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

这篇关于PySpark 抛出错误方法 __getnewargs__([]) 不存在的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆