在Apache Spark中删除空的DataFrame分区 [英] Dropping empty DataFrame partitions in Apache Spark

查看:226
本文介绍了在Apache Spark中删除空的DataFrame分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试根据列的N(让N=3)在分区列x中具有不同值的列对DataFrame进行重新分区,例如:

I try to repartition a DataFrame according to a column the the DataFrame has N (let say N=3) different values in the partition-column x, e.g:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

我想要实现的是用x重新分配myDF而不产生空分区.有没有比这更好的方法了?

What I like to achieve is to repartiton myDF by x without producing empty partitions. Is there a better way than doing this?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(如果我未在repartiton中指定numParts,则我的大多数分区都是空的(因为repartition创建200个分区)...)

(If I don't specify numParts in repartiton, most of my partitions are empty (as repartition creates 200 partitions) ...)

推荐答案

我想到的解决方案是遍历df分区并在其中获取记录数,以查找非空分区.

I'd think of solution with iterating over df partition and fetching record count in it to find non-empty partitions.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

当我们获得非空分区(nonEmptyPart)时,我们可以使用coalesce()(

As we got non-empty partitions (nonEmptyPart), we can clean empty partitions by using coalesce() (check coalesce() vs repartition()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

可能不是最好的选择,但是此解决方案将避免改组,因为我们不使用repartition()

It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")

df1.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

val finalDf = df1.coalesce(nonEmptyPart.value.toInt)

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

输出

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3

这篇关于在Apache Spark中删除空的DataFrame分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆