在 spark 中,在添加新行时,它们是否可以替代 union() 函数? [英] In spark, is their any alternative for union() function while appending new row?

查看:22
本文介绍了在 spark 中,在添加新行时,它们是否可以替代 union() 函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的代码 table_df 中有一些列,我在这些列上进行了一些计算,例如 min、max、mean 等,我想创建具有指定架构 new_df_schema 的 new_df.在我的逻辑中,我编写了用于计算的 spark-sql,并将每个新生成的行附加到最初为空的 new_df 中,最后,它会生成 new_df 以及所有列的所有计算值.

In my code table_df has some columns on which I am doing some calculations like min, max, mean etc. and I want to create new_df with specified schema new_df_schema. In my logic, I have written spark-sql for calculations and appending each new generated row to initially empty new_df and at the end, it results in new_df with all calculated values for all columns.

但问题是当列数更多时会导致性能问题.这是否可以在不使用 union() 函数或任何其他提高性能的方法的情况下完成?

But the problem is when the columns are more in number it leads to performance issue. Can this be done without using union() function or any other approach to increase performance?

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._

    val table_df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
    table_df.show(false)

    table_df.createOrReplaceTempView("table_df")

     val new_df_schema = StructType(
      StructField("Column_Name", StringType, false) ::
        StructField("number_of_values", LongType, false) ::
        StructField("number_of_distinct_values", LongType, false) ::
        StructField("distinct_count_with_nan", LongType, false) ::
        StructField("distinct_count_without_nan", LongType, false) ::
        StructField("is_unique", BooleanType, false) ::
        StructField("number_of_missing_values", LongType, false) ::
        StructField("percentage_of_missing_values", DoubleType, false) ::
        StructField("percentage_of_unique_values", DoubleType, false) ::
        StructField("05_PCT", DoubleType, false) ::
        StructField("25_PCT", DoubleType, false) ::
        StructField("50_PCT", DoubleType, false) ::
        StructField("75_PCT", DoubleType, false) ::
        StructField("95_PCT", DoubleType, false) ::
        StructField("max", DoubleType, false) ::
        StructField("min", DoubleType, false) ::
        StructField("mean", DoubleType, false) ::
        StructField("std", DoubleType, false) ::
        StructField("skewness", DoubleType, false) ::
        StructField("kurtosis", DoubleType, false) ::
        StructField("range", DoubleType, false) ::
        StructField("variance", DoubleType, false) :: Nil
    )
    var new_df = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], new_df_schema)

    for (c <- table_df.columns) {
      val num = sparkSession.sql(
        s"""SELECT
           | '$c' AS Column_Name,
           | COUNT(${c}) AS number_of_values,
           | COUNT(DISTINCT ${c}) AS number_of_distinct_values,
           | COUNT(DISTINCT ${c}) AS distinct_count_with_nan,
           | (COUNT(DISTINCT ${c}) - 1) AS distinct_count_without_nan,
           | (COUNT(${c}) == COUNT(DISTINCT ${c})) AS is_unique,
           | (COUNT(*) - COUNT(${c})) AS number_of_missing_values,
           | ((COUNT(*) - COUNT(${c}))/COUNT(*)) AS percentage_of_missing_values,
           | (COUNT(DISTINCT ${c})/COUNT(*)) AS percentage_of_unique_values,
           | APPROX_PERCENTILE($c,0.05) AS 05_PCT,
           | APPROX_PERCENTILE($c,0.25) AS 25_PCT,
           | APPROX_PERCENTILE($c,0.50) AS 50_PCT,
           | APPROX_PERCENTILE($c,0.75) AS 75_PCT,
           | APPROX_PERCENTILE($c,0.95) AS 95_PCT,
           | MAX($c) AS max,
           | MIN($c) AS min,
           | MEAN($c) AS mean,
           | STD($c) AS std,
           | SKEWNESS($c) AS skewness,
           | KURTOSIS($c) AS kurtosis,
           | (MAX($c) - MIN($c)) AS range,
           | VARIANCE($c) AS variance
           | FROM
           | table_df""".stripMargin)
        .toDF()
      new_df = new_df.union(num) // this results performance issue when then number of columns in table_df is more
    }
    new_df.show(false)

==================================================
table_df:
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|111  |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

new_df:

+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|Column_Name|number_of_values|number_of_distinct_values|distinct_count_with_nan|distinct_count_without_nan|is_unique|number_of_missing_values|percentage_of_missing_values|percentage_of_unique_values|05_PCT|25_PCT|50_PCT|75_PCT|95_PCT|max   |min |mean  |std               |skewness           |kurtosis           |range |variance         |
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|col_1      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |10.0  |100.0 |111.0 |1123.0|1321.0|1321.0|10.0|533.0 |634.0634826261484 |0.4334269738367067 |-1.7463346405299973|1311.0|402036.5         |
|col_2      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |20.0  |200.0 |222.0 |2123.0|2321.0|2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738682 |-1.799741951675132 |2301.0|1302313.7        |
|col_3      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |30.0  |300.0 |333.0 |3123.0|3321.0|3321.0|30.0|1421.4|1649.399072389699 |0.3979251063785061 |-1.8119558312496054|3291.0|2720517.3        |
|col_4      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |40.0  |400.0 |444.0 |4123.0|4321.0|4321.0|40.0|1865.6|2157.926620624529 |0.39502047381456235|-1.8165124206347685|4281.0|4656647.3        |
|col_5      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |50.0  |500.0 |555.0 |5123.0|5321.0|5321.0|50.0|2309.8|2666.59027598917  |0.3935246673563026 |-1.8186685628112493|5271.0|7110703.699999999|
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+

推荐答案

union 的替代方案.

检查下面的代码.

scala> df.show(false)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|111  |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

构建所需的表达式.

scala> val descExpr = array(
    df.columns
    .map(c => struct(
        lit(c).cast("string").as("column_name"),
        max(col(c)).cast("string").as("max"),
        min(col(c)).cast("string").as("min"),
        mean(col(c)).cast("string").as("mean"),
        stddev(col(c)).cast("string").as("std"),
        skewness(col(c)).cast("string").as("skewness"),
        kurtosis(col(c)).cast("string").as("kurtosis")
        )
    ):_*
).as("data")

必填列.

val columns = Seq("column_name","max","min","mean","std","skewness","kurtosis")
 .map(c => if(c != "column_name") col(c).cast("double").as(c) else col(c))```

最终输出

scala> df
 .select(descExpr)
 .selectExpr("explode(data) as data")
 .select("data.*")
 .select(columns:_*)
 .show(false)

+-----------+------+----+------+------------------+-------------------+-------------------+
|column_name|max   |min |mean  |std               |skewness           |kurtosis           |
+-----------+------+----+------+------------------+-------------------+-------------------+
|col_1      |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|
|col_2      |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|
|col_3      |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506  |-1.8119558312496056|
|col_4      |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|
|col_5      |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125  |
+-----------+------+----+------+------------------+-------------------+-------------------+

更新

scala> val finalDF = df.select(descExpr).selectExpr("explode(data) as data").select("data.*").select(columns:_*)

使用 Approx Quantile 为所有列创建新数据框.

Create new dataframe with Approx Quantile for all columns.

scala> val approxQuantileDF = df
.columns
.map(c => (c,df.stat.approxQuantile(c,Array(0.25,0.5,0.75),0.0)))
.toList
.toDF("column_name","approx_quantile")

scala> finalDF
        .join(approxQuantileDF,
              Seq("column_name"),
              "left"
    ).show(false)
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|column_name|max   |min |mean  |std               |skewness           |kurtosis           |approx_quantile       |
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|col_1      |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|[100.0, 111.0, 1123.0]|
|col_2      |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|[200.0, 222.0, 2123.0]|
|col_3      |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506  |-1.8119558312496056|[300.0, 333.0, 3123.0]|
|col_4      |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|[400.0, 444.0, 4123.0]|
|col_5      |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125  |[500.0, 555.0, 5123.0]|
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+

这篇关于在 spark 中,在添加新行时,它们是否可以替代 union() 函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆