Spark Dataset/Dataframe连接NULL倾斜键 [英] Spark Dataset/Dataframe join NULL skew key

查看:103
本文介绍了Spark Dataset/Dataframe连接NULL倾斜键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Spark Dataset/DataFrame联接时,我面临运行时间长且OOM作业失败的情况.

Working with Spark Dataset/DataFrame joins, I faced long running and failed with OOM jobs.

这里的输入:

  • 〜10个不同大小的数据集,大部分都是巨大的(> 1 TB)
  • 所有左连接到一个基本数据集
  • 一些连接键为 null

经过分析,我发现失败和缓慢的作业原因是 null 斜键:当左侧有数百万条具有连接键 null 的记录.

After some analysis, I found that failed and slow jobs reason is null skew key: when left side has millions of records with join key null.

我采取了一些蛮力的方法来解决这个问题,在这里我想与大家分享.

I made some brute force approach to solve this issue, and here's I want to share it.

如果您有更好的解决方案或任何内置解决方案(对于常规Apache Spark),请进行共享.

If you have better or any built-in solutions(for regular Apache Spark), please share it.

推荐答案

这里是我的解决方案:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

简而言之:我将负数据集的联接键 null 值替换为负范围,以使其均匀地重新分区.

In short: I replace left dataset join key null values by negative range, to make it evenly repartitioned.

注意:此解决方案仅适用于左联接和 null 联接键倾斜.我不想爆炸正确的数据集并为任何键做偏斜的解决方案.另外,在该步骤之后, null 连接键值将分配到不同的分区,因此, mapPartitions 等将不起作用.

NOTE: this solution only for left join and null join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null join key values will be distributed to different partitions, hence, mapPartitions etc. won't work.

作为总结,上述解决方案对我有所帮助,但我想看到针对此类数据集联接问题的更多解决方案.

As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.

这篇关于Spark Dataset/Dataframe连接NULL倾斜键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆