为什么 Spark DataFrame 会创建错误数量的分区? [英] Why Spark DataFrame is creating wrong number of partitions?

查看:28
本文介绍了为什么 Spark DataFrame 会创建错误数量的分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 2 列的 spark 数据框 - col1col2.

I have a spark dataframe with 2 columns - col1 and col2.

scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]

当我以parquet格式在磁盘上写入df时,将所有数据写入文件数量等于col1中唯一值的数量code> 我使用 col1 做了一个 repartition,就像这样:

When I write df on disk in parquet format, to write all the data in number of files equal to the number of unique values in col1 I do a repartition using col1, like this:

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")

以上代码在文件系统中只生成一个文件.但是,shuffle 操作的次数变成了 200.

Above code produces only one file in filesystem. But, the number of shuffle operations becomes 200.

我在这里无法理解一件事,如果 col1 只包含一个值,即 1 那么它为什么要在 repartition<中创建 200 个分区/代码> ?

I am not able to understand one thing here that if col1 contains only one value, i.e., 1 then why it is creating 200 partitions in repartition ?

推荐答案

repartition(columnName) 默认创建 200 个分区(更具体的,spark.sql.shuffle.partitions> 分区),无论 col1 有多少个唯一值.如果 col1 只有 1 个唯一值,则 199 个分区为空.另一方面,如果 col1 的唯一值超过 200 个,则每个分区将有多个 col1 值.

repartition(columnName) per default creates 200 partitions (more specific, spark.sql.shuffle.partitions partitions), no matter how many unique values of col1 there are. If there is only 1 unique value of col1, then 199 of the partitions are empty. On the other hand, if you have more than 200 unique values of col1, you will will have multiple values of col1 per partition.

如果你只想要 1 个分区,那么你可以做 repartition(1,col("col1")) 或者只是 coalesce(1).但并不是说 coalesce 的行为不一样,因为 coalesce 我在你的代码中被进一步移动,你可能会失去并行性(参见 如何防止 Spark 优化)

If you only want 1 partition, then you can do repartition(1,col("col1")) or just coalesce(1). But not that coalesce does not behave the same in the sense that coalesce me be moved further up in your code suscht that you may loose parallelism (see How to prevent Spark optimization)

如果您想检查分区的内容,我为此提供了两种方法:

If you want to check the content of your partition, I've made 2 methods for this:

// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => {
       Iterator(partIt.toSeq.size)
    }
    ).toDF("record_count")
}

// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => {
      val part = partIt.toSet
      val partSize = part.size
        val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
        val partKeyStr = partKeys.mkString(", ")
        val partKeyCount = partKeys.size
       Iterator((partKeys.toArray,partSize))
    }
    ).toDF("partitions","record_count")
}

现在你可以例如像这样检查您的数据框:

Now you can e.g. check your dataframe like this:

inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show

这篇关于为什么 Spark DataFrame 会创建错误数量的分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆