Spark Dataset/Dataframe连接NULL倾斜键 [英] Spark Dataset/Dataframe join NULL skew key
问题描述
使用Spark Dataset/DataFrame联接时,我面临运行时间长且OOM作业失败的情况.
Working with Spark Dataset/DataFrame joins, I faced long running and failed with OOM jobs.
这里的输入:
- 〜10个不同大小的数据集,大部分都是巨大的(> 1 TB)
- 所有左连接到一个基本数据集
- 一些连接键为
null
经过分析,我发现失败和缓慢的作业原因是 null
斜键:当左侧有数百万条具有连接键 null
的记录.
After some analysis, I found that failed and slow jobs reason is null
skew key: when left side has millions of records with join key null
.
我采取了一些蛮力的方法来解决这个问题,在这里我想与大家分享.
I made some brute force approach to solve this issue, and here's I want to share it.
如果您有更好的解决方案或任何内置解决方案(对于常规Apache Spark),请进行共享.
If you have better or any built-in solutions(for regular Apache Spark), please share it.
推荐答案
这里是我的解决方案:
/**
* Expression that produce negative random between -1 and -`lowestValue`(inclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("negative", negativeRandomWithin(3))
* .select("negative")
* .distinct()
* .show(false)
* }}}
* +--------+
* |negative|
* +--------+
* |-2 |
* |-3 |
* |-1 |
* +--------+
*/
private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
negate(positiveRandomWithin(lowestValue)) - 1
}
/**
* Expression that produce positive random between 0 and `highestValue`(exclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("positive", positiveRandomWithin(3))
* .select("positive")
* .distinct()
* .show(false)
* }}}
* +--------+
* |positive|
* +--------+
* |0 |
* |1 |
* |2 |
* +--------+
*/
private[transformation] def positiveRandomWithin(highestValue: Long) = {
pmod((rand * highestValue).cast(LongType), lit(highestValue))
}
implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {
/**
* Particular optimized version of left outer join where left side of join has skewed `null` field.
*
* @note
* It works only for single column join which is applicable for `isNotNull`.
*
* Optimization algorithm:
* 1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
* 2. use appended column, with original join column value and `null` replacements, as join column from left dataset
* appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
*
* @note there is no checks how many `null` values on left dataset before applying above steps,
* as well as there is no checks does it sort merge join or broadcast.
*
* IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
*
* HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
*/
private[transformation] def nullSkewLeftJoin(right: DataFrame,
joinLeftCol: Column,
joinRightCol: Column,
skewedColumnPostFix: String = "skewed_column",
nullNumBuckets: Int = 10000): DataFrame = {
val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"
if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
} else {
underlying
.withColumn(skewedTempColumn,
when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
}
}
}
简而言之:我将负数据集的联接键 null
值替换为负范围,以使其均匀地重新分区.
In short: I replace left dataset join key null
values by negative range, to make it evenly repartitioned.
注意:此解决方案仅适用于左联接和 null
联接键倾斜.我不想爆炸正确的数据集并为任何键做偏斜的解决方案.另外,在该步骤之后, null
连接键值将分配到不同的分区,因此, mapPartitions
等将不起作用.
NOTE: this solution only for left join and null
join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null
join key values will be distributed to different partitions, hence, mapPartitions
etc. won't work.
作为总结,上述解决方案对我有所帮助,但我想看到针对此类数据集联接问题的更多解决方案.
As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.
这篇关于Spark Dataset/Dataframe连接NULL倾斜键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!