PySpark:向DataFrame添加更多列的最佳实践 [英] PySpark: Best practice to add more columns to a DataFrame
问题描述
Spark Dataframes具有方法withColumn
一次添加一个新列.要添加多列,需要一个withColumn
链.这是最佳做法吗?
Spark Dataframes has a method withColumn
to add one new column at a time. To add multiple columns, a chain of withColumn
s are required. Is this the best practice to do this?
我觉得使用mapPartitions
有更多优势.假设我有一个由三个withColumn
组成的链,然后有一个过滤器根据特定条件删除了Row
.这是四个不同的操作(不过,我不确定其中是否有宽泛的转换).但是,如果我执行mapPartitions
,我可以一次性完成所有操作.如果我有一个数据库连接,而我希望每个RDD分区打开一次,这也有帮助.
I feel that usingmapPartitions
has more advantages. Let's say I have a chain of three withColumn
s and then one filter to remove Row
s based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
. It also helps if I have a database connection that I would prefer to open once per RDD partition.
我的问题分为两个部分.
My question has two parts.
第一部分,这是我对mapPartitions的实现.这种方法是否有无法预料的问题?还有没有更优雅的方法可以做到这一点?
The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,在withColumn
和filter
链上使用mapPartitions
的权衡是什么?
The second part, what are the tradeoffs in using mapPartitions
over a chain of withColumn
and filter
?
我在某处读到,将可用的方法与Spark DF结合使用总是比推出自己的实现更好.如果我的论点有误,请告诉我.谢谢!欢迎所有想法.
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.
推荐答案
这种方法是否存在无法预料的问题?
Are there any unforeseen issues with this approach?
多个.最严重的影响是:
Multiple. The most severe implications are:
- 与普通的
DataFrame
代码相比,内存占用量高出几倍,并且垃圾回收开销很大. - 在执行上下文之间移动数据需要很高的序列化和反序列化费用.
- 在查询计划器中引入断点.
- 按原样,在
toDF
调用上进行模式推断的成本(如果提供了正确的模式,则可以避免),并且可能会重新执行所有先前的步骤. - 依此类推...
- A few times higher memory footprint to compared to plain
DataFrame
code and significant garbage collection overhead. - High cost of serialization and deserialization required to move data between execution contexts.
- Introducing breaking point in the query planner.
- As is, cost of schema inference on
toDF
call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps. - And so on...
其中一些可以通过udf
和select
/withColumn
避免,而其他则不能.
Some of these can be avoided with udf
and select
/ withColumn
, other cannot.
假设我有一个由三个withColumns组成的链,然后有一个过滤器根据某些条件删除Rows.这是四个不同的操作(不过,我不确定其中是否有宽泛的转换).但是,如果我执行mapPartitions,我可以一劳永逸地做到这一点
let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
您的mapPartitions
不会删除任何操作,并且不提供Spark计划程序无法排除的任何优化.唯一的优点是,它为昂贵的连接对象提供了很好的作用域.
Your mapPartitions
doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.
我在某处读到,将可用的方法与Spark DF结合使用总是比推出自己的实现要好
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation
当您开始使用执行程序端Python逻辑时,您已经与Spark SQL背道而驰.不管使用udf
,RDD
还是新添加的矢量化udf.最终,您应该根据代码的整体结构来做出决定-如果主要是直接对数据执行Python逻辑,那么最好还是坚持使用RDD
或完全跳过Spark.
When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf
, RDD
or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD
or skip Spark completely.
如果这只是逻辑的一小部分,并且不会引起严重的性能问题,请不要着急.
If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.
这篇关于PySpark:向DataFrame添加更多列的最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!