如何有效地在 Spark Datframe 中添加多列 [英] How can i add multiple columns in Spark Datframe in efficiently

查看:50
本文介绍了如何有效地在 Spark Datframe 中添加多列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组列名称,需要在现有数据框中添加这些列,这些列的大小也非常大,我需要使用 StringType 和默认空值将集合中的所有列添加到数据框中.我遵循以下方法,但我发现当列数和数据帧大小很大时,这会影响我的性能.有没有更好的方法来解决这个问题?注意:列数:~500

I have set of columns names and need to add those columns in existing dataframe which is also very huge in size, i need to add the all columns from set to dataframe with StringType and default null value. I am following below approach but i found that when the number of columns and dataframe size is huge this affecting my performance. Is there any better way to this in spark? Note : Number of columns : ~500

import sparkSession.sqlContext.implicits._
var df = Seq(
  (1, "James"),
  (2, "Michael"),
  (3, "Robert"),
  (4, "Washington"),
  (5, "Jefferson")
).toDF("Id", "Name")
df.show(false)

val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
diff_set.foreach(x => {
  if (x.size > 0) {
    df = df.withColumn(x, lit(null)).withColumn(x, col(x).cast(StringType))
  }
})
df.show(false)

+---+----------+
|Id |Name      |
+---+----------+
|1  |James     |
|2  |Michael   |
|3  |Robert    |
|4  |Washington|
|5  |Jefferson |
+---+----------+

+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|Id |Name      |col7|col8|col3|col17|col6|col20|col2|col14|col16|col21|col15|col9|col10|col5|col1|col13|col19|col11|col22|col18|col4|col12|
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|1  |James     |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|2  |Michael   |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|3  |Robert    |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|4  |Washington|null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|5  |Jefferson |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+

推荐答案

使用 select

df
.select(
    df.columns.map(c => col(c).as(c)) ++ 
    diff_set.map(c => lit(null).cast("string").as(c)):_*
)
.show(false)

使用foldLeft

scala> df.show(false)
+---+----------+
|Id |Name      |
+---+----------+
|1  |James     |
|2  |Michael   |
|3  |Robert    |
|4  |Washington|
|5  |Jefferson |
+---+----------+

scala> val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet

scala> 

diff_set
.foldLeft(df)((ddf,c) => 
    ddf
    .withColumn(c,lit(null).cast("string"))
)
.show(false)

+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|Id |Name      |col7|col8|col3|col17|col6|col20|col2|col14|col16|col21|col15|col9|col10|col5|col1|col13|col19|col11|col22|col18|col4|col12|
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+
|1  |James     |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|2  |Michael   |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|3  |Robert    |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|4  |Washington|null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
|5  |Jefferson |null|null|null|null |null|null |null|null |null |null |null |null|null |null|null|null |null |null |null |null |null|null |
+---+----------+----+----+----+-----+----+-----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+----+-----+

对比

1000000 条记录使用 foldLeft - 所用时间:18017 毫秒

Using foldLeft for 1000000 records - Time taken: 18017 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    diff_set.foldLeft(df)((ddf,c) => ddf.withColumn(c,lit(null).cast("string"))).show(false)
}

1000000 条记录使用 crossJoin - 所用时间:13224 毫秒

Using crossJoin for 1000000 records - Time taken: 13224 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    val dfb = Seq(("null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null", "null")).toDF(diff_set.toList:_*)
    df.crossJoin(dfb).show(false)
}

1000000 条记录使用 select - 所用时间:8519 毫秒

Using select for 1000000 records - Time taken: 8519 ms

spark.time {
    val diff_set = Seq("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11", "col12", "col13", "col14", "col15", "col16", "col17", "col18", "col19", "col20", "col21", "col22").toSet
    val df = (1 to 1000000).toDF
    df.select(df.columns.map(c => col(c).as(c)) ++ diff_set.map(c => lit(null).cast("string").as(c)):_*).show
}

这篇关于如何有效地在 Spark Datframe 中添加多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆