连接具有不同维度的数据集-如何正确聚合数据 [英] join datasets with different dimensions - how to aggregate data properly

查看:145
本文介绍了连接具有不同维度的数据集-如何正确聚合数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一种复杂的逻辑,需要将数量从一个数据集重新分配到另一个数据集.

I am working on a complex logic where I need to redistribute a quantity from one dataset to another dataset.

此问题是此问题的延续

在下面的示例中,我要介绍几个新的方面.在汇总并分配了所有数量之后,我期望总数量相同,但是会有一些差异.

In the example below I am introducing several new dimensions. After aggregating and distributing all the quantities I am expecting the same total quantity however I have some differences.

请参见下面的示例

package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, round, sum}

object sample3 {

  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .getOrCreate()

  val sc = spark.sparkContext

  final case class Owner(a: Long,
                         b: String,
                         c: Long,
                         d: Short,
                         e: String,
                         f: String,
                         o_qtty: Double)

  // notice column d is not present in Invoice
  final case class Invoice(c: Long,
                           a: Long,
                           b: String,
                           e: String,
                           f: String,
                           i_qtty: Double)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)

    import spark.implicits._

    val ownerData = Seq(
      Owner(11, "A", 666, 2017, "x", "y", 50),
      Owner(11, "A", 222, 2018, "x", "y", 20),
      Owner(33, "C", 444, 2018, "x", "y", 20),
      Owner(33, "C", 555, 2018, "x", "y", 120),
      Owner(22, "B", 555, 2018, "x", "y", 20),
      Owner(99, "D", 888, 2018, "x", "y", 100),
      Owner(11, "A", 888, 2018, "x", "y", 100),
      Owner(11, "A", 666, 2018, "x", "y", 80),
      Owner(33, "C", 666, 2018, "x", "y", 80),
      Owner(11, "A", 444, 2018, "x", "y", 50),
    )

    val invoiceData = Seq(
      Invoice(444, 33, "C", "x", "y", 10),
      Invoice(999, 22, "B", "x", "y", 200),
      Invoice(666, 11, "A", "x", "y", 15),
      Invoice(555, 22, "B", "x", "y", 200),
      Invoice(888, 11, "A", "x", "y", 12),
    )

    val owners = spark
      .createDataset(ownerData)
      .as[Owner]
      .cache()

    val invoices = spark
      .createDataset(invoiceData)
      .as[Invoice]
      .cache()

    val p1 = owners
      .join(invoices, Seq("a", "c", "e", "f", "b"))
      .selectExpr(
        "a",
        "d",
        "b",
        "e",
        "f",
        "c",
        "IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty",
        "IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute"
      )

    val p2 = owners
      .join(invoices, Seq("a", "c", "e", "f", "b"), "left_outer")
      .filter(row => row.anyNull)
      .drop(col("i_qtty"))
      .withColumnRenamed("o_qtty", "qtty")

    val distribute = p1
      .groupBy("a", "d", "b", "e", "f")
      .agg(sum(col("to_distribute")).as("to_distribute"))

    val proportion = p2
      .groupBy("a", "d", "b", "e", "f")
      .agg(sum(col("qtty")).as("proportion"))

    val result = p2
      .join(distribute, Seq("a", "d", "b", "e", "f"))
      .join(proportion, Seq("a", "d", "b", "e", "f"))
      .withColumn(
        "qtty",
        round(
          ((col("to_distribute") / col("proportion")) * col("qtty")) + col(
            "qtty"
          ),
          2
        )
      )
      .drop("to_distribute", "proportion")
      .union(p1.drop("to_distribute"))
    result.show(false)
    result.selectExpr("SUM(qtty)").show()
    owners.selectExpr("SUM(o_qtty)").show()
    
    /*
    +---+----+---+---+---+---+-----+
    |a  |d   |b  |e  |f  |c  |qtty |
    +---+----+---+---+---+---+-----+
    |11 |2018|A  |x  |y  |222|27.71|
    |33 |2018|C  |x  |y  |555|126.0|
    |33 |2018|C  |x  |y  |666|84.0 |
    |11 |2018|A  |x  |y  |444|69.29|
    |11 |2017|A  |x  |y  |666|35.0 |
    |33 |2018|C  |x  |y  |444|10.0 |
    |22 |2018|B  |x  |y  |555|20.0 |
    |11 |2018|A  |x  |y  |888|88.0 |
    |11 |2018|A  |x  |y  |666|65.0 |
    +---+----+---+---+---+---+-----+
    
    +---------+
    |sum(qtty)|
    +---------+
    |    525.0|
    +---------+
    
    +-----------+
    |sum(o_qtty)|
    +-----------+
    |      640.0|
    +-----------+
     */
  }

}

此外,请注意,聚合不得产生任何负数.

Also, note that the aggregation must not produce any negative quantity.

推荐答案

我显示了进行更改所必需的代码.

I show the code where was necessary to do changes.

val distribute = p1
  .groupBy("a","b", "e", "f") // now we don't need to aggregate by field "d"
  .agg(sum(col("to_distribute")).as("to_distribute"))

val proportion = p2
  .groupBy("a","b", "e", "f") // now we don't need to aggregate by field "d"
  .agg(sum(col("qtty")).as("proportion"))

// Here we remove "d" from the join
// If the distribution is null(there is no data in invoices for that owner) 
// then we keep the original "qtty"
// column "d" from p2 dataframe was renamed as "year"
val result = p2
  .join(distribute, Seq("a","b", "e", "f"),"left_outer")
  .join(proportion, Seq("a","b", "e", "f"))
  .selectExpr("a","b","e","f","c","IF(ROUND( ((to_distribute/proportion) * qtty) + qtty, 2) IS NULL,qtty,ROUND( ((to_distribute/proportion) * qtty) + qtty, 2)) AS qtty","d AS year")
  .union(p1.withColumn("year",col("d")).drop("d","to_distribute"))
  .orderBy(col("b"))

****EXPECTED OUTPUT****
+---+---+---+---+---+-----+----+
|a  |b  |e  |f  |c  |qtty |year|
+---+---+---+---+---+-----+----+
|11 |A  |x  |y  |444|80.0 |2018|
|11 |A  |x  |y  |222|32.0 |2018|
|11 |A  |x  |y  |666|65.0 |2018|
|11 |A  |x  |y  |888|88.0 |2018|
|11 |A  |x  |y  |666|35.0 |2017|
|22 |B  |x  |y  |555|20.0 |2018|
|33 |C  |x  |y  |555|126.0|2018|
|33 |C  |x  |y  |444|10.0 |2018|
|33 |C  |x  |y  |666|84.0 |2018|
|99 |D  |x  |y  |888|100.0|2018|
+---+---+---+---+---+-----+----+



+---------+
|sum(qtty)|
+---------+
|    640.0|
+---------+

+-----------+
|sum(o_qtty)|
+-----------+
|      640.0|
+-----------+

这篇关于连接具有不同维度的数据集-如何正确聚合数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆