数量重新分配逻辑-具有外部数据集的MapGroups [英] Quantity redistribution logic - MapGroups with external dataset

查看:125
本文介绍了数量重新分配逻辑-具有外部数据集的MapGroups的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一种复杂的逻辑,需要将数量从一个数据集重新分配到另一个数据集.

I am working on a complex logic where I need to redistribute a quantity from one dataset to another dataset.

在示例中,我们有OwnerInvoice-我们需要从Invoice减去精确的Owner匹配项的数量(在给定汽车的给定邮政编码下). 减去的数量需要重新分配回出现在同一辆汽车上的其他邮政编码. 发生复杂性的地方是我们应该避免分配到在Invoice表中存在同一辆车的另一个​​pcode的邮政编码.

In the example we have Owner and Invoice - We need to subtract the quantity from the Invoice to the exact Owner match (at a given postal code for a given car). The subtracted quantity needs to be redistributed back to the other postal code where the same car appears. The complexity happens where we should avoid distributing to postal code where the same car is present in the Invoice table for another pcode.

最后,如果减法或重新分布产生负值,对于给定的Invoice,我们应该避免这种变换.

Finally, in case the subtraction or the re-distribution produces a negative value, we should avoid this transformation for the given Invoice.

这是一个带有数字的例子

Here is an example with numbers

下面是代码版本,但不幸的是,它无法按预期工作.更具体地说,我不知道如何跳过给定汽车在发票中多次出现的记录. 在第一个示例(红色)中,我不知道如何跳过记录Owner(A,888,100).

Below is the code version but unfortunately it doesn't work as expected. More specifically I don't know how to skip the records that are present multiple times in the Invoice for a given car. In the first example (red), I don't know how to skip the record Owner(A, 888, 100).

package playground

import org.apache.spark.sql.SparkSession


object basic extends App {
  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  final case class Owner(car: String, pcode: String, qtty: Double)
  final case class Invoice(car: String, pcode: String, qtty: Double)

  val sc = spark.sparkContext

  val data = Seq(
    Owner("A", "666", 80),
    Owner("B", "555", 20),
    Owner("A", "444", 50),
    Owner("A", "222", 20),
    Owner("C", "444", 20),
    Owner("C", "666", 80),
    Owner("C", "555", 120),
    Owner("A", "888", 100)
  )

  val fleet = Seq(
    Invoice("A", "666", 15),
    Invoice("C", "444", 10),
    Invoice("A", "888", 12),
    Invoice("B", "555", 200)
  )

  val owners = spark.createDataset(data)
  val invoices = spark.createDataset(fleet)

  val actual = owners
    .joinWith(invoices, owners("Car") === invoices("Car"), joinType = "right")
    .groupByKey(_._2)
    .flatMapGroups {
      case (invoice, group) =>
        val subOwner: Vector[Owner] = group.toVector.map(_._1)
        val householdToBeInvoiced: Vector[Owner] =
          subOwner.filter(_.pcode == invoice.pcode)
        val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
          // negative compensation (remove the quantity from Invoice for the exact match)
          val neg: Owner = householdToBeInvoiced.head
          val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - invoice.qtty)

          // positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
          // invoice for the same car
          val pos = subOwner.filter(s => s.pcode != invoice.pcode)
          val totalQuantityOwner = pos.map(_.qtty).sum
          val calculatedPos: Vector[Owner] =
            pos.map(
              c =>
                c.copy(
                  qtty = c.qtty + invoice.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
              )
            )

          (calculatedPos :+ calculatedNeg)
        } else {
          subOwner
        }

        modifiedOwner
    }
}

此代码产生

+---+-----+------------------+
|car|pcode|              qtty|
+---+-----+------------------+
|  A|  888|116.66666666666667|
|  A|  222|23.333333333333332|
|  A|  444|58.333333333333336|
|  A|  666|              65.0|
|  C|  555|126.66666666666667|
|  C|  666| 84.44444444444444|
|  C|  444|              10.0|
|  B|  555|            -180.0|
|  A|  222|              24.8|
|  A|  444|              62.0|
|  A|  666|              99.2|
|  A|  888|              88.0|
+---+-----+------------------+

任何支持将不胜感激!谢谢

Any support will be much appreciated! Thanks

经过更多思考后,我设法改进了代码,但仍然无法采用迭代方法(使用上一个计算来计算下一个,例如获得红色记录的结果以产生蓝色记录等).

After some more thought on this problem, I managed to improve the code but I still cannot get the iterative approach in place (use the previous computation to compute the next one, e.g. get the result of the red record to produce the blue record etc.)

package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}

object basic extends App {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  val spark = SparkSession
    .builder()
    .appName("Spark Optimization Playground")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  final case class Owner(car: String, pcode: String, qtty: Double)
  final case class Invoice(car: String, pcode: String, qtty: Double)

  val data = Seq(
    Owner("A", "666", 80),
    Owner("B", "555", 20),
    Owner("A", "444", 50),
    Owner("A", "222", 20),
    Owner("C", "444", 20),
    Owner("C", "666", 80),
    Owner("C", "555", 120),
    Owner("A", "888", 100)
  )

  val fleet = Seq(
    Invoice("A", "666", 15),
    Invoice("C", "444", 10),
    Invoice("A", "888", 12),
    Invoice("B", "555", 200)
  )

  val owners = spark.createDataset(data)
  val invoices = spark.createDataset(fleet)

  val secondFleets = invoices.map(identity)

  val fleetPerCar =
    invoices
      .joinWith(secondFleets, invoices("car") === secondFleets("car"), "inner")
      .groupByKey(_._1)
      .flatMapGroups {
        case (value, iter) ⇒ Iterator((value, iter.toArray))
      }

  val gb
    : KeyValueGroupedDataset[(Invoice, Array[(Invoice, Invoice)]),
                             (Owner, (Invoice, Array[(Invoice, Invoice)]))] =
    owners
      .joinWith(fleetPerCar, owners("car") === fleetPerCar("_1.car"), "right")
      .groupByKey(_._2)

  val x: Dataset[Owner] =
    gb.flatMapGroups {
      case (fleet, group) =>
        val subOwner: Vector[Owner] = group.toVector.map(_._1)
        val householdToBeInvoiced: Vector[Owner] =
          subOwner.filter(_.pcode == fleet._1.pcode)
        val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
          // negative compensation (remove the quantity from Invoice for the exact match)
          val neg: Owner = householdToBeInvoiced.head
          val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - fleet._1.qtty)

          // positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
          // invoice for the same car
          val otherPCode =
            fleet._2.filter(_._2.pcode != fleet._1.pcode).map(_._2.pcode)

          val pos = subOwner.filter(
            s => s.pcode != fleet._1.pcode && !otherPCode.contains(s.pcode)
          )
          val totalQuantityOwner = pos.map(_.qtty).sum + neg.qtty
          val calculatedPos: Vector[Owner] =
            pos.map(
              c =>
                c.copy(
                  qtty = c.qtty + fleet._1.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
              )
            )
          // if pos or neg compensation produce negative quantity, skip the computation
          val res = (calculatedPos :+ calculatedNeg)
          if (res.exists(_.qtty < 0)) {
            subOwner
          } else {
            res
          }
        } else {
          subOwner
        }

        modifiedOwner
    }
  x.show()
}

推荐答案

第一个解决方案基于Spark DatasetsSparkSQL,并提供了预期的结果.

The first solution is based on Spark Datasets and SparkSQL and provides the expected results.

有许多方法可以配置此方法,甚至考虑到性能问题,这可能会在后面讨论.

There are many ways to configure this approach, even taking into account performance issues, which may be discuss later on.

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}

object basic {

  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .getOrCreate()

  val sc = spark.sparkContext

  case class Owner(car: String, pcode: String, qtty: Double)
  case class Invoice(car: String, pcode: String, qtty: Double)

  def main(args: Array[String]): Unit = {

    val data = Seq(
      Owner("A", "666", 80),
      Owner("B", "555", 20),
      Owner("A", "444", 50),
      Owner("A", "222", 20),
      Owner("C", "444", 20),
      Owner("C", "666", 80),
      Owner("C", "555", 120),
      Owner("A", "888", 100)
    )

    val fleet = Seq(
      Invoice("A", "666", 15),
      Invoice("C", "666", 10),
      Invoice("A", "888", 12),
      Invoice("B", "555", 200)
    )

    val expected = Seq(
      Owner("A", "666", 65),
      Owner("B", "555", 20), // not redistributed because produce a negative value
      Owner("A", "444", 69.29),
      Owner("A", "222", 27.71),
      Owner("C", "444", 21.43),
      Owner("C", "666", 70),
      Owner("C", "555", 128.57),
      Owner("A", "888", 88)
    )

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {
      import spark.implicits._

      val owners = spark.createDataset(data).as[Owner].cache()
      val invoices = spark.createDataset(fleet).as[Invoice].cache()

      owners.createOrReplaceTempView("owners")
      invoices.createOrReplaceTempView("invoices")

      /**
        * this part fetch car and pcode from owner with the substracted quantity from invoice
        */
      val p1 = spark.sql(
        """SELECT i.car,i.pcode,
          |CASE WHEN (o.qtty - i.qtty) < 0 THEN o.qtty ELSE (o.qtty - i.qtty) END AS qtty,
          |CASE WHEN (o.qtty - i.qtty) < 0 THEN 0 ELSE i.qtty END AS to_distribute
          |FROM owners o
          |INNER JOIN invoices i  ON(i.car = o.car AND i.pcode = o.pcode)
          |""".stripMargin)
        .cache()
      p1.createOrReplaceTempView("p1")

      /**
        * this part fetch all the car and pcode that we have to redistribute their quantity
        */
      val p2 = spark.sql(
        """SELECT o.car, o.pcode, o.qtty
          |FROM owners o
          |LEFT OUTER JOIN invoices i  ON(i.car = o.car AND i.pcode = o.pcode)
          |WHERE i.car IS NULL
          |""".stripMargin)
        .cache()
      p2.createOrReplaceTempView("p2")

      /**
        * this part fetch the quantity to distribute
        */
      val distribute = spark.sql(
        """
          |SELECT car, SUM(to_distribute) AS to_distribute
          |FROM p1
          |GROUP BY car
          |""".stripMargin)
        .cache()
      distribute.createOrReplaceTempView("distribute")

      /**
        * this part fetch the proportion to distribute proportionally
        */
      val proportion = spark.sql(
        """
          |SELECT car, SUM(qtty) AS proportion
          |FROM p2
          |GROUP BY car
          |""".stripMargin)
          .cache()
      proportion.createOrReplaceTempView("proportion")


      /**
        * this part join p1 and p2 with the distribution calculated
        */
      val result = spark.sql(
        """
          |SELECT p2.car, p2.pcode, ROUND(((to_distribute / proportion) * qtty) + qtty, 2) AS qtty
          |FROM p2
          |JOIN distribute d ON(p2.car = d.car)
          |JOIN proportion p ON(d.car = p.car)
          |UNION ALL
          |SELECT car, pcode, qtty
          |FROM p1
          |""".stripMargin)

      result.show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty  |
+---+-----+------+
|A  |444  |69.29 |
|A  |222  |27.71 |
|C  |444  |21.43 |
|C  |555  |128.57|
|A  |666  |65.0  |
|B  |555  |20.0  |
|C  |666  |70.0  |
|A  |888  |88.0  |
+---+-----+------+
*/

      expected
        .toDF("car","pcode","qtty")
        .show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty  |
+---+-----+------+
|A  |666  |65.0  |
|B  |555  |20.0  |
|A  |444  |69.29 |
|A  |222  |27.71 |
|C  |444  |21.43 |
|C  |666  |70.0  |
|C  |555  |128.57|
|A  |888  |88.0  |
+---+-----+------+
*/

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

使用API​​数据集

对此问题具有相同结果的另一种方法是使用Datasets及其出色的API作为示例:

Another approach for this problem with the same results would be to use Datasets and its great API, as an example of this:

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

object basic2 {

  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .getOrCreate()

  val sc = spark.sparkContext

  final case class Owner(car: String, pcode: String, o_qtty: Double)
  final case class Invoice(car: String, pcode: String, i_qtty: Double)

  def main(args: Array[String]): Unit = {

    val data = Seq(
      Owner("A", "666", 80),
      Owner("B", "555", 20),
      Owner("A", "444", 50),
      Owner("A", "222", 20),
      Owner("C", "444", 20),
      Owner("C", "666", 80),
      Owner("C", "555", 120),
      Owner("A", "888", 100)
    )

    val fleet = Seq(
      Invoice("A", "666", 15),
      Invoice("C", "666", 10),
      Invoice("A", "888", 12),
      Invoice("B", "555", 200)
    )

    val expected = Seq(
      Owner("A", "666", 65),
      Owner("B", "555", 20), // not redistributed because produce a negative value
      Owner("A", "444", 69.29),
      Owner("A", "222", 27.71),
      Owner("C", "444", 21.43),
      Owner("C", "666", 70),
      Owner("C", "555", 128.57),
      Owner("A", "888", 88)
    )

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {
      import spark.implicits._

      val owners = spark.createDataset(data)
        .as[Owner]
        .cache()

      val invoices = spark.createDataset(fleet)
        .as[Invoice]
        .cache()

      val p1 = owners
        .join(invoices,Seq("car","pcode"),"inner")
        .selectExpr("car","pcode","IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty","IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute")
        .persist(StorageLevel.MEMORY_ONLY)

      val p2 = owners
        .join(invoices,Seq("car","pcode"),"left_outer")
        .filter(row => row.anyNull == true)
        .drop(col("i_qtty"))
        .withColumnRenamed("o_qtty","qtty")
        .persist(StorageLevel.MEMORY_ONLY)

      val distribute = p1
        .groupBy(col("car"))
        .agg(sum(col("to_distribute")).as("to_distribute"))
        .persist(StorageLevel.MEMORY_ONLY)

      val proportion = p2
          .groupBy(col("car"))
          .agg(sum(col("qtty")).as("proportion"))
          .persist(StorageLevel.MEMORY_ONLY)

      val result = p2
        .join(distribute, "car")
        .join(proportion, "car")
        .withColumn("qtty",round( ((col("to_distribute") / col("proportion")) * col("qtty")) + col("qtty"), 2 ))
        .drop("to_distribute","proportion")
        .union(p1.drop("to_distribute"))

      result.show()
/*
+---+-----+------+
|car|pcode|  qtty|
+---+-----+------+
|  A|  444| 69.29|
|  A|  222| 27.71|
|  C|  444| 21.43|
|  C|  555|128.57|
|  A|  666|  65.0|
|  B|  555|  20.0|
|  C|  666|  70.0|
|  A|  888|  88.0|
+---+-----+------+
*/

      expected
        .toDF("car","pcode","qtty")
        .show(truncate = false)
/*
+---+-----+------+
|car|pcode|qtty  |
+---+-----+------+
|A  |666  |65.0  |
|B  |555  |20.0  |
|A  |444  |69.29 |
|A  |222  |27.71 |
|C  |444  |21.43 |
|C  |666  |70.0  |
|C  |555  |128.57|
|A  |888  |88.0  |
+---+-----+------+
*/

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

一些有关性能和调优的一般注意事项.

这始终取决于您的特定用例,但通常,首先,如果可以过滤和清理数据,则可以看到一些改进.

It always depends of your particular use case but in general, first, if you can filter and clean the data, you could see some improvement.

使用高级声明式API的全部目的是使自己与低级实现细节隔离. 优化是的工作.催化剂优化程序. 它是一个复杂的引擎,我真的怀疑有人可以轻松地改进它,而不必深入了解其内部.

A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. Optimization is a job of the Catalyst Optimizer. It is a sophisticated engine and I really doubt someone can easily improve on that without diving much deeper into its internals.

默认分区数 属性:spark.sql.shuffle.partitions,正确设置.

Default Number of Partitions Property: spark.sql.shuffle.partitions , Set It Up Properly.

默认情况下,Spark SQL使用spark.sql.shuffle.partitions号 用于聚合和联接的分区,即默认为200. 这通常会导致分区爆炸,而没有任何影响 查询的性能,因为这200个任务(每个分区)具有 在获得结果之前,所有一切都要开始并完成.

By default Spark SQL uses spark.sql.shuffle.partitions number of partitions for aggregations and joins, i.e. 200 by default. That often leads to explosion of partitions for nothing that does impact the performance of a query since these 200 tasks (per partition) have all to start and finish before you get the result.

请考虑您的查询实际需要多少个分区.

Think how many partitions your query really requires.

Spark只能为RDD的每个分区运行1个并发任务,最多不能超过集群中的核心数. 因此,如果您的集群具有50个核心,则您希望RDD至少具有50个分区. 至于选择好",则选择好".分区的数量,您通常希望至少与并行执行器的数量一样多. 您可以通过调用

Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions. As far as choosing a "good" number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling

sc.defaultParallelism

或通过以下方式检查RDD分区号

or inspect RDD Partitions number by

df.rdd.partitions.size

重新分区:增加分区,过滤后重新平衡分区会增加并行性repartition(numPartitions: Int)

Repartition: increase partitions, rebalancing partitions after filter increase paralellism repartition(numPartitions: Int)

Coalesce :在输出到HDFS/外部coalesce(numPartitions: Int, suffle: Boolean = false)

Coalesce: decrease partitions WITHOUT shuffle consolidate before outputting to HDFS/external coalesce(numPartitions: Int, suffle: Boolean = false)

您可以点击以下链接:管理Spark具有合并和分区功能的分区

You can follow this link: Managing Spark Partitions with Coalesce and Repartition

缓存数据以避免重新计算:dataFrame.cache()

分析器-逻辑查询计划分析器

Analyzer是Spark SQL中的逻辑查询计划分析器,在语义上 验证未解决的逻辑计划并将其转换为已分析的逻辑计划.

Analyzer is the logical query plan analyzer in Spark SQL that semantically validates and transforms an unresolved logical plan to an analyzed logical plan.

您可以使用explain(带有 扩展标记已启用)

You can access the analyzed logical plan of a Dataset using explain (with extended flag enabled)

dataframe.explain(extended = true)

有关其他性能选项,请参阅文档: 性能调整

For further performance options see the documentation: Performance Tuning

有很多方法可以调整Spark流程,但这始终取决于您的用例.

There are a lot of possiblities for tunning Spark processes, but it always depends of your use case.

批处理或流传输过程?数据框还是普通的RDD?蜂巢还是蜂巢?数据是否随机播放?,等等...

Batch or Streaming process? Dataframes or plain RDD's? Hive or not Hive? Shuffled data or not?, etc...

我强烈建议您 Jacek的 Spark SQL内部原理拉斯科夫斯基(Laskowski).

I strongly recommend you The Internals of Spark SQL by Jacek Laskowski.

最后,您将不得不使用不同的值和基准进行一些试验,以查看花费大量时间处理数据样本的过程.

Finally, you will have to do some trials with different values and benchmark to see how many time is taking the process with a data sample.

  val start = System.nanoTime()

  // my process

  val end = System.nanoTime()

  val time = end - start
  println(s"My App takes: $time")

希望这会有所帮助.

这篇关于数量重新分配逻辑-具有外部数据集的MapGroups的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆