在 spark 中,在添加新行时,它们是否可以替代 union() 函数? [英] In spark, is their any alternative for union() function while appending new row?
问题描述
在我的代码 table_df
中有一些列,我在这些列上进行了一些计算,例如 min、max、mean 等,我想创建具有指定架构 new_df_schema 的 new_df.在我的逻辑中,我编写了用于计算的 spark-sql,并将每个新生成的行附加到最初为空的 new_df 中,最后,它会生成 new_df
以及所有列的所有计算值.
In my code table_df
has some columns on which I am doing some calculations like min, max, mean etc. and I want to create new_df with specified schema new_df_schema. In my logic, I have written spark-sql for calculations and appending each new generated row to initially empty new_df and at the end, it results in new_df
with all calculated values for all columns.
但问题是当列数更多时会导致性能问题.这是否可以在不使用 union() 函数或任何其他提高性能的方法的情况下完成?
But the problem is when the columns are more in number it leads to performance issue. Can this be done without using union() function or any other approach to increase performance?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._
val table_df = Seq(
(10, 20, 30, 40, 50),
(100, 200, 300, 400, 500),
(111, 222, 333, 444, 555),
(1123, 2123, 3123, 4123, 5123),
(1321, 2321, 3321, 4321, 5321)
).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
table_df.show(false)
table_df.createOrReplaceTempView("table_df")
val new_df_schema = StructType(
StructField("Column_Name", StringType, false) ::
StructField("number_of_values", LongType, false) ::
StructField("number_of_distinct_values", LongType, false) ::
StructField("distinct_count_with_nan", LongType, false) ::
StructField("distinct_count_without_nan", LongType, false) ::
StructField("is_unique", BooleanType, false) ::
StructField("number_of_missing_values", LongType, false) ::
StructField("percentage_of_missing_values", DoubleType, false) ::
StructField("percentage_of_unique_values", DoubleType, false) ::
StructField("05_PCT", DoubleType, false) ::
StructField("25_PCT", DoubleType, false) ::
StructField("50_PCT", DoubleType, false) ::
StructField("75_PCT", DoubleType, false) ::
StructField("95_PCT", DoubleType, false) ::
StructField("max", DoubleType, false) ::
StructField("min", DoubleType, false) ::
StructField("mean", DoubleType, false) ::
StructField("std", DoubleType, false) ::
StructField("skewness", DoubleType, false) ::
StructField("kurtosis", DoubleType, false) ::
StructField("range", DoubleType, false) ::
StructField("variance", DoubleType, false) :: Nil
)
var new_df = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], new_df_schema)
for (c <- table_df.columns) {
val num = sparkSession.sql(
s"""SELECT
| '$c' AS Column_Name,
| COUNT(${c}) AS number_of_values,
| COUNT(DISTINCT ${c}) AS number_of_distinct_values,
| COUNT(DISTINCT ${c}) AS distinct_count_with_nan,
| (COUNT(DISTINCT ${c}) - 1) AS distinct_count_without_nan,
| (COUNT(${c}) == COUNT(DISTINCT ${c})) AS is_unique,
| (COUNT(*) - COUNT(${c})) AS number_of_missing_values,
| ((COUNT(*) - COUNT(${c}))/COUNT(*)) AS percentage_of_missing_values,
| (COUNT(DISTINCT ${c})/COUNT(*)) AS percentage_of_unique_values,
| APPROX_PERCENTILE($c,0.05) AS 05_PCT,
| APPROX_PERCENTILE($c,0.25) AS 25_PCT,
| APPROX_PERCENTILE($c,0.50) AS 50_PCT,
| APPROX_PERCENTILE($c,0.75) AS 75_PCT,
| APPROX_PERCENTILE($c,0.95) AS 95_PCT,
| MAX($c) AS max,
| MIN($c) AS min,
| MEAN($c) AS mean,
| STD($c) AS std,
| SKEWNESS($c) AS skewness,
| KURTOSIS($c) AS kurtosis,
| (MAX($c) - MIN($c)) AS range,
| VARIANCE($c) AS variance
| FROM
| table_df""".stripMargin)
.toDF()
new_df = new_df.union(num) // this results performance issue when then number of columns in table_df is more
}
new_df.show(false)
==================================================
table_df:
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|111 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
new_df:
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|Column_Name|number_of_values|number_of_distinct_values|distinct_count_with_nan|distinct_count_without_nan|is_unique|number_of_missing_values|percentage_of_missing_values|percentage_of_unique_values|05_PCT|25_PCT|50_PCT|75_PCT|95_PCT|max |min |mean |std |skewness |kurtosis |range |variance |
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|col_1 |5 |5 |5 |4 |true |0 |0.0 |1.0 |10.0 |100.0 |111.0 |1123.0|1321.0|1321.0|10.0|533.0 |634.0634826261484 |0.4334269738367067 |-1.7463346405299973|1311.0|402036.5 |
|col_2 |5 |5 |5 |4 |true |0 |0.0 |1.0 |20.0 |200.0 |222.0 |2123.0|2321.0|2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738682 |-1.799741951675132 |2301.0|1302313.7 |
|col_3 |5 |5 |5 |4 |true |0 |0.0 |1.0 |30.0 |300.0 |333.0 |3123.0|3321.0|3321.0|30.0|1421.4|1649.399072389699 |0.3979251063785061 |-1.8119558312496054|3291.0|2720517.3 |
|col_4 |5 |5 |5 |4 |true |0 |0.0 |1.0 |40.0 |400.0 |444.0 |4123.0|4321.0|4321.0|40.0|1865.6|2157.926620624529 |0.39502047381456235|-1.8165124206347685|4281.0|4656647.3 |
|col_5 |5 |5 |5 |4 |true |0 |0.0 |1.0 |50.0 |500.0 |555.0 |5123.0|5321.0|5321.0|50.0|2309.8|2666.59027598917 |0.3935246673563026 |-1.8186685628112493|5271.0|7110703.699999999|
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
推荐答案
union
的替代方案.
检查下面的代码.
scala> df.show(false)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|111 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
构建所需的表达式.
scala> val descExpr = array(
df.columns
.map(c => struct(
lit(c).cast("string").as("column_name"),
max(col(c)).cast("string").as("max"),
min(col(c)).cast("string").as("min"),
mean(col(c)).cast("string").as("mean"),
stddev(col(c)).cast("string").as("std"),
skewness(col(c)).cast("string").as("skewness"),
kurtosis(col(c)).cast("string").as("kurtosis")
)
):_*
).as("data")
必填列.
val columns = Seq("column_name","max","min","mean","std","skewness","kurtosis")
.map(c => if(c != "column_name") col(c).cast("double").as(c) else col(c))```
最终输出
scala> df
.select(descExpr)
.selectExpr("explode(data) as data")
.select("data.*")
.select(columns:_*)
.show(false)
+-----------+------+----+------+------------------+-------------------+-------------------+
|column_name|max |min |mean |std |skewness |kurtosis |
+-----------+------+----+------+------------------+-------------------+-------------------+
|col_1 |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|
|col_2 |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|
|col_3 |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506 |-1.8119558312496056|
|col_4 |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|
|col_5 |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125 |
+-----------+------+----+------+------------------+-------------------+-------------------+
更新
scala> val finalDF = df.select(descExpr).selectExpr("explode(data) as data").select("data.*").select(columns:_*)
使用 Approx Quantile
为所有列创建新数据框.
Create new dataframe with Approx Quantile
for all columns.
scala> val approxQuantileDF = df
.columns
.map(c => (c,df.stat.approxQuantile(c,Array(0.25,0.5,0.75),0.0)))
.toList
.toDF("column_name","approx_quantile")
scala> finalDF
.join(approxQuantileDF,
Seq("column_name"),
"left"
).show(false)
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|column_name|max |min |mean |std |skewness |kurtosis |approx_quantile |
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|col_1 |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|[100.0, 111.0, 1123.0]|
|col_2 |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|[200.0, 222.0, 2123.0]|
|col_3 |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506 |-1.8119558312496056|[300.0, 333.0, 3123.0]|
|col_4 |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|[400.0, 444.0, 4123.0]|
|col_5 |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125 |[500.0, 555.0, 5123.0]|
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
这篇关于在 spark 中,在添加新行时,它们是否可以替代 union() 函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!