使用 Python 的 reduce() 加入多个 PySpark DataFrames [英] Using Python's reduce() to join multiple PySpark DataFrames

查看:22
本文介绍了使用 Python 的 reduce() 加入多个 PySpark DataFrames的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有谁知道为什么在加入多个 PySpark DataFrames 时使用 Python3 的 functools.reduce() 会导致比使用 for 循环迭代地加入相同的 DataFrames 更差的性能?具体来说,这会导致大幅减速,然后出现内存不足错误:

Does anyone know why using Python3's functools.reduce() would lead to worse performance when joining multiple PySpark DataFrames than just iteratively joining the same DataFrames using a for loop? Specifically, this gives a massive slowdown followed by an out-of-memory error:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

而这个没有:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法将不胜感激.谢谢!

Any ideas would be greatly appreciated. Thanks!

推荐答案

一个原因是 reduce 或 fold 通常在功能上是纯的:每次累加操作的结果不会写入内存的同一部分,而是写入一个新的内存块.

One reason is that a reduce or a fold is usually functionally pure: the result of each accumulation operation is not written to the same part of memory, but rather to a new block of memory.

原则上垃圾收集器可以在每次累加后释放前一个块,但如果不这样做,您将为累加器的每个更新版本分配内存.

In principle the garbage collector could free the previous block after each accumulation, but if it doesn't you'll allocate memory for each updated version of the accumulator.

这篇关于使用 Python 的 reduce() 加入多个 PySpark DataFrames的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆