使用Python的reduce()来连接多个PySpark DataFrames [英] Using Python's reduce() to join multiple PySpark DataFrames
问题描述
有谁知道为什么使用Python3的functools.reduce()
会导致在连接多个PySpark DataFrame时比使用for
循环迭代地联接相同的DataFrame时导致更差的性能?具体来说,这会导致速度大幅下降,然后出现内存不足错误:
Does anyone know why using Python3's functools.reduce()
would lead to worse performance when joining multiple PySpark DataFrames than just iteratively joining the same DataFrames using a for
loop? Specifically, this gives a massive slowdown followed by an out-of-memory error:
def join_dataframes(list_of_join_columns, left_df, right_df):
return left_df.join(right_df, on=list_of_join_columns)
joined_df = functools.reduce(
functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)
而这个不是:
joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
joined_df = joined_df.join(right_df, on=list_of_join_columns)
任何想法都将不胜感激.谢谢!
Any ideas would be greatly appreciated. Thanks!
推荐答案
一个原因是减小或折叠通常在功能上是纯净的:每个累加操作的结果不会写入内存的同一部分,而是写入一个新的内存块.
One reason is that a reduce or a fold is usually functionally pure: the result of each accumulation operation is not written to the same part of memory, but rather to a new block of memory.
原则上,垃圾收集器可以在每次累加后释放上一个块,但是如果不这样做,则将为累加器的每个更新版本分配内存.
In principle the garbage collector could free the previous block after each accumulation, but if it doesn't you'll allocate memory for each updated version of the accumulator.
这篇关于使用Python的reduce()来连接多个PySpark DataFrames的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!