在 Apache Spark 中删除空的 DataFrame 分区 [英] Dropping empty DataFrame partitions in Apache Spark
问题描述
我尝试根据 DataFrame 在分区列 x 中具有
,例如:N
(假设 N=3
)不同值的列重新分区 DataFrame
I try to repartition a DataFrame according to a column the the DataFrame has N
(let say N=3
) different values in the partition-column x
, e.g:
val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data
我想要实现的是通过 x
重新分区 myDF
而不产生空分区.还有比这更好的方法吗?
What I like to achieve is to repartiton myDF
by x
without producing empty partitions. Is there a better way than doing this?
val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")
(如果我没有在 repartiton
中指定 numParts
,我的大部分分区都是空的(因为 repartition
创建了 200 个分区)...)
(If I don't specify numParts
in repartiton
, most of my partitions are empty (as repartition
creates 200 partitions) ...)
推荐答案
我想到的解决方案是迭代df
分区并获取其中的记录数以找到非空分区.
I'd think of solution with iterating over df
partition and fetching record count in it to find non-empty partitions.
val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart")
df.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
由于我们有非空分区 (nonEmptyPart
),我们可以使用 coalesce()
(检查coalesce() vs repartition()).
As we got non-empty partitions (nonEmptyPart
), we can clean empty partitions by using coalesce()
(check coalesce() vs repartition()).
val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type
它可能是也可能不是最好的,但是这个解决方案将避免改组,因为我们没有使用repartition()
It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()
val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")
df1.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
val finalDf = df1.coalesce(nonEmptyPart.value.toInt)
println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")
输出
nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3
这篇关于在 Apache Spark 中删除空的 DataFrame 分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!