Spark Dataset/Dataframe join NULL skew key [英] Spark Dataset/Dataframe join NULL skew key

查看:33
本文介绍了Spark Dataset/Dataframe join NULL skew key的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用 Spark 数据集/数据帧连接时,我面临长时间运行并在 OOM 作业中失败.

Working with Spark Dataset/DataFrame joins, I faced long running and failed with OOM jobs.

输入:

  • ~10 个不同大小的数据集,大部分是巨大的(>1 TB)
  • 全部左连接到一个基础数据集
  • 一些连接键是null

经过一些分析,我发现失败和缓慢的工作原因是null skew key: when left side has数百万条记录,join key null.

After some analysis, I found that failed and slow jobs reason is null skew key: when left side has millions of records with join key null.

我采用了一些蛮力方法来解决这个问题,在这里我想分享一下.

I made some brute force approach to solve this issue, and here's I want to share it.

如果您有更好的或任何内置解决方案(适用于常规 Apache Spark),请分享.

If you have better or any built-in solutions(for regular Apache Spark), please share it.

推荐答案

这是我想到的解决方案:

Here's solution I came to:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

简而言之:我用负范围替换左数据集连接键 null 值,以使其均匀重新分区.

In short: I replace left dataset join key null values by negative range, to make it evenly repartitioned.

注意:此解决方案仅适用于左连接和 null 连接键偏斜.我不想爆炸正确的数据集并为任何键做倾斜解决方案.此外,在该步骤之后,null 连接键值将被分配到不同的分区,因此,mapPartitions 等将不起作用.

NOTE: this solution only for left join and null join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null join key values will be distributed to different partitions, hence, mapPartitions etc. won't work.

总而言之,上述解决方案对我有帮助,但我想看到更多针对此类数据集连接问题的解决方案.

As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.

这篇关于Spark Dataset/Dataframe join NULL skew key的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆