PySpark:向DataFrame添加更多列的最佳实践 [英] PySpark: Best practice to add more columns to a DataFrame

查看:284
本文介绍了PySpark:向DataFrame添加更多列的最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark Dataframes具有方法withColumn一次添加一个新列.要添加多列,需要一个withColumn链.这是最佳做法吗?

Spark Dataframes has a method withColumn to add one new column at a time. To add multiple columns, a chain of withColumns are required. Is this the best practice to do this?

我觉得使用mapPartitions有更多优势.假设我有一个由三个withColumn组成的链,然后有一个过滤器根据特定条件删除了Row.这是四个不同的操作(不过,我不确定其中是否有宽泛的转换).但是,如果我执行mapPartitions,我可以一次性完成所有操作.如果我有一个数据库连接,而我希望每个RDD分区打开一次,这也有帮助.

I feel that usingmapPartitions has more advantages. Let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions. It also helps if I have a database connection that I would prefer to open once per RDD partition.

我的问题分为两个部分.

My question has two parts.

第一部分,这是我对mapPartitions的实现.这种方法是否有无法预料的问题?还有没有更优雅的方法可以做到这一点?

The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

第二部分,在withColumnfilter链上使用mapPartitions的权衡是什么?

The second part, what are the tradeoffs in using mapPartitions over a chain of withColumn and filter?

我在某处读到,将可用的方法与Spark DF结合使用总是比推出自己的实现更好.如果我的论点有误,请告诉我.谢谢!欢迎所有想法.

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.

推荐答案

这种方法是否存在无法预料的问题?

Are there any unforeseen issues with this approach?

多个.最严重的影响是:

Multiple. The most severe implications are:

  • 与普通的DataFrame代码相比,内存占用量高出几倍,并且垃圾回收开销很大.
  • 在执行上下文之间移动数据需要很高的序列化和反序列化费用.
  • 在查询计划器中引入断点.
  • 按原样,在toDF调用上进行模式推断的成本(如果提供了正确的模式,则可以避免),并且可能会重新执行所有先前的步骤.
  • 依此类推...
  • A few times higher memory footprint to compared to plain DataFrame code and significant garbage collection overhead.
  • High cost of serialization and deserialization required to move data between execution contexts.
  • Introducing breaking point in the query planner.
  • As is, cost of schema inference on toDF call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.
  • And so on...

其中一些可以通过udfselect/withColumn避免,而其他则不能.

Some of these can be avoided with udf and select / withColumn, other cannot.

假设我有一个由三个withColumns组成的链,然后有一个过滤器根据某些条件删除Rows.这是四个不同的操作(不过,我不确定其中是否有宽泛的转换).但是,如果我执行mapPartitions,我可以一劳永逸地做到这一点

let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions

您的mapPartitions不会删除任何操作,并且不提供Spark计划程序无法排除的任何优化.唯一的优点是,它为昂贵的连接对象提供了很好的作用域.

Your mapPartitions doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.

我在某处读到,将可用的方法与Spark DF结合使用总是比推出自己的实现要好

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation

当您开始使用执行程序端Python逻辑时,您已经与Spark SQL背道而驰.不管使用udfRDD还是新添加的矢量化udf.最终,您应该根据代码的整体结构来做出决定-如果主要是直接对数据执行Python逻辑,那么最好还是坚持使用RDD或完全跳过Spark.

When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf, RDD or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD or skip Spark completely.

如果这只是逻辑的一小部分,并且不会引起严重的性能问题,请不要着急.

If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.

这篇关于PySpark:向DataFrame添加更多列的最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆