PySpark 抛出错误方法 __getnewargs__([]) 不存在 [英] PySpark Throwing error Method __getnewargs__([]) does not exist
问题描述
我有一组文件.文件的路径保存在一个文件中.例如 all_files.txt
.使用 apache spark,我需要对所有文件进行操作并将结果分组.
我想做的步骤是:
- 通过阅读
all_files.txt
创建一个RDD - 对于
all_files.txt
中的每一行(每一行都是某个文件的路径),将每个文件的内容读入一个 RDD - 然后对所有内容进行操作
这是我为此编写的代码:
def return_contents_from_file (file_name):返回 spark.read.text(file_name).rdd.map(lambda r: r[0])def run_spark():file_name = 'path_to_file'火花 = SparkSession .builder .appName("PythonWordCount") .getOrCreate()counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) # 这一行应该返回每个文件的路径.flatMap(return_contents_from_file) # 在这里我希望将所有文件的所有内容都放在一起.flatMap(do_operation_on_each_line_of_all_files) # 在这里我期望对所有文件的每一行进行操作
这是抛出错误:
<块引用>第 323 行,在 get_return_value py4j.protocol.Py4JError 中:一个错误在调用 o25.getnewargs 时发生.跟踪:py4j.Py4JException:方法 getnewargs([]) 在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)在 py4j.Gateway.invoke(Gateway.java:272) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:214) 在java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么以及我应该如何继续.提前致谢.
在 flatMap
内使用 spark
或任何发生在执行程序上的转换都是不允许的(spark
会话仅在驱动程序上可用).也无法创建 RDD 的 RDD(参见:是否可以在 Apache Spark 中创建嵌套的 RDD?)
但是您可以通过另一种方式实现这种转换 - 将 all_files.txt
的所有内容读入数据帧,使用 local map
来制作它们dataframes 和 local reduce
以联合所有,参见示例:
I have a set of files. The path to the files are saved in a file., say all_files.txt
. Using apache spark, I need to do an operation on all the files and club the results.
The steps that I want to do are:
- Create an RDD by reading
all_files.txt
- For each line in
all_files.txt
(Each line is a path to some file), read the contents of each of the files into a single RDD - Then do an operation all contents
This is the code I wrote for the same:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession
.builder
.appName("PythonWordCount")
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
This is throwing the error:
line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
Can someone please tell me what I am doing wrong and how I should proceed further. Thanks in advance.
Using spark
inside flatMap
or any transformation that occures on executors is not allowed (spark
session is available on driver only). It is also not possible to create RDD of RDDs (see: Is it possible to create nested RDDs in Apache Spark?)
But you can achieve this transformation in another way - read all content of all_files.txt
into dataframe, use local map
to make them dataframes and local reduce
to union all, see example:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
这篇关于PySpark 抛出错误方法 __getnewargs__([]) 不存在的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!