PySpark:向 DataFrame 添加更多列的最佳实践 [英] PySpark: Best practice to add more columns to a DataFrame

查看:33
本文介绍了PySpark:向 DataFrame 添加更多列的最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark Dataframes 有一个方法 withColumn 可以一次添加一个新列.要添加多个列,需要一个 withColumn 链.这是执行此操作的最佳做​​法吗?

Spark Dataframes has a method withColumn to add one new column at a time. To add multiple columns, a chain of withColumns are required. Is this the best practice to do this?

我觉得使用mapPartitions有更多的优势.假设我有一个由三个 withColumn 和一个过滤器组成的链,用于根据某些条件删除 Row .这是四种不同的操作(不过,我不确定其中是否有任何一种是广泛的转换).但是如果我做一个mapPartitions,我可以一口气完成所有事情.如果我有一个我希望每个 RDD 分区打开一次的数据库连接,这也有帮助.

I feel that usingmapPartitions has more advantages. Let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions. It also helps if I have a database connection that I would prefer to open once per RDD partition.

我的问题有两个部分.

第一部分,这是我对mapPartitions的实现.这种方法有什么不可预见的问题吗?有没有更优雅的方法来做到这一点?

The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

第二部分,在 withColumnfilter 链上使用 mapPartitions 有什么权衡?

The second part, what are the tradeoffs in using mapPartitions over a chain of withColumn and filter?

我在某处读到,使用 Spark DF 的可用方法总是比推出自己的实现更好.如果我的论点有误,请告诉我.谢谢!欢迎提出所有想法.

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.

推荐答案

这种方法有什么不可预见的问题吗?

Are there any unforeseen issues with this approach?

多个.最严重的影响是:

Multiple. The most severe implications are:

  • 与普通 DataFrame 代码相比,内存占用高出几倍,并且垃圾收集开销很大.
  • 在执行上下文之间移动数据所需的序列化和反序列化成本很高.
  • 在查询规划器中引入断点.
  • 照原样,toDF 调用的模式推断成本(如果提供适当的模式可以避免)以及可能重新执行所有前面的步骤.
  • 等等...
  • A few times higher memory footprint to compared to plain DataFrame code and significant garbage collection overhead.
  • High cost of serialization and deserialization required to move data between execution contexts.
  • Introducing breaking point in the query planner.
  • As is, cost of schema inference on toDF call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.
  • And so on...

其中一些可以通过 udfselect/withColumn 避免,其他则不能.

Some of these can be avoided with udf and select / withColumn, other cannot.

假设我有一个由三个 withColumns 组成的链,然后是一个过滤器,用于根据某些条件删除 Rows.这是四种不同的操作(不过,我不确定其中是否有任何一种是广泛的转换).但如果我做一个 mapPartitions

let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions

您的 mapPartitions 不删除任何操作,也不提供任何优化,Spark planner 无法排除这些操作.它唯一的优点是它为昂贵的连接对象提供了一个很好的范围.

Your mapPartitions doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.

我在某处读到,使用 Spark DF 的可用方法总是比推出自己的实现更好

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation

当您开始使用执行器端 Python 逻辑时,您已经与 Spark SQL 不同了.如果您使用 udfRDD 或新添加的矢量化 udf,都没有关系.归根结底,您应该根据代码的整体结构做出决定 - 如果主要是直接在数据上执行的 Python 逻辑,最好坚持使用 RDD 或完全跳过 Spark.

When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf, RDD or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD or skip Spark completely.

如果这只是逻辑的一小部分,并且不会导致严重的性能问题,请不要担心.

If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.

这篇关于PySpark:向 DataFrame 添加更多列的最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆