在 Apache Spark 中删除空的 DataFrame 分区 [英] Dropping empty DataFrame partitions in Apache Spark

查看:47
本文介绍了在 Apache Spark 中删除空的 DataFrame 分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试根据 DataFrame 在分区列 x 中具有 N(假设 N=3)不同值的列重新分区 DataFrame,例如:

I try to repartition a DataFrame according to a column the the DataFrame has N (let say N=3) different values in the partition-column x, e.g:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

我想要实现的是通过 x 重新分区 myDF 而不产生空分区.还有比这更好的方法吗?

What I like to achieve is to repartiton myDF by x without producing empty partitions. Is there a better way than doing this?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(如果我没有在 repartiton 中指定 numParts,我的大部分分区都是空的(因为 repartition 创建了 200 个分区)...)

(If I don't specify numParts in repartiton, most of my partitions are empty (as repartition creates 200 partitions) ...)

推荐答案

我想到的解决方案是迭代df分区并获取其中的记录数以找到非空分区.

I'd think of solution with iterating over df partition and fetching record count in it to find non-empty partitions.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

由于我们有非空分区 (nonEmptyPart),我们可以使用 coalesce() (检查coalesce() vs repartition()).

As we got non-empty partitions (nonEmptyPart), we can clean empty partitions by using coalesce() (check coalesce() vs repartition()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

它可能是也可能不是最好的,但是这个解决方案将避免改组,因为我们没有使用repartition()

It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")

df1.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

val finalDf = df1.coalesce(nonEmptyPart.value.toInt)

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

输出

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3

这篇关于在 Apache Spark 中删除空的 DataFrame 分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆