PySpark投掷错误方法__getnewargs __([])不存在 [英] PySpark Throwing error Method __getnewargs__([]) does not exist

查看:263
本文介绍了PySpark投掷错误方法__getnewargs __([])不存在的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组文件.文件的路径保存在文件中,例如all_files.txt.使用apache spark,我需要对所有文件进行操作并将结果合并.

I have a set of files. The path to the files are saved in a file., say all_files.txt. Using apache spark, I need to do an operation on all the files and club the results.

我要执行的步骤是:

  • 通过阅读all_files.txt
  • 创建RDD
  • 对于all_files.txt中的每一行(每行都是某个文件的路径), 将每个文件的内容读入一个RDD
  • 然后对所有内容进行操作
  • Create an RDD by reading all_files.txt
  • For each line in all_files.txt (Each line is a path to some file), read the contents of each of the files into a single RDD
  • Then do an operation all contents

这是我为此写的代码:

def return_contents_from_file (file_name):
    return spark.read.text(file_name).rdd.map(lambda  r: r[0])

def run_spark():
    file_name = 'path_to_file'

    spark = SparkSession \
        .builder \
        .appName("PythonWordCount") \
        .getOrCreate()

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
        .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
        .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files

这引发了错误:

第323行,位于get_return_value py4j.protocol.Py4JError:错误 发生在调用o25时. getnewargs .跟踪:py4j.Py4JException: 方法 getnewargs ([])在以下位置不存在 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 在 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) 在py4j.Gateway.invoke(Gateway.java:272)处 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在py4j.commands.CallCommand.execute(CallCommand.java:79)处 py4j.GatewayConnection.run(GatewayConnection.java:214)位于 java.lang.Thread.run(Thread.java:745)

line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

有人可以告诉我我做错了什么以及应该如何进一步进行.预先感谢.

Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.

推荐答案

不允许在flatMap中使用spark或在执行程序上发生的任何转换(spark会话仅在驱动程序上可用).也无法创建RDD的RDD(请参阅:是否可以在Apache Spark中创建嵌套的RDD?)

Using spark inside flatMap or any transformation that occures on executors is not allowed (spark session is available on driver only). It is also not possible to create RDD of RDDs (see: Is it possible to create nested RDDs in Apache Spark?)

但是您可以通过另一种方式来实现此转换-将all_files.txt的所有内容读入数据框,使用 local map使其成为数据框,然后使用 local 全部合并,请参见示例:

But you can achieve this transformation in another way - read all content of all_files.txt into dataframe, use local map to make them dataframes and local reduce to union all, see example:

>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

这篇关于PySpark投掷错误方法__getnewargs __([])不存在的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆