为什么Spark DataFrame创建的分区数量错误? [英] Why Spark DataFrame is creating wrong number of partitions?
问题描述
我有一个带有两列的Spark数据框-col1
和col2
.
I have a spark dataframe with 2 columns - col1
and col2
.
scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
当我以parquet
格式在磁盘上写入df
时,要以等于col1
中唯一值数量的文件数写入所有数据,我会使用col1
进行repartition
,就像这样:
When I write df
on disk in parquet
format, to write all the data in number of files equal to the number of unique values in col1
I do a repartition
using col1
, like this:
scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")
以上代码在文件系统中仅生成一个文件.但是,随机播放操作的数量变为200.
Above code produces only one file in filesystem. But, the number of shuffle operations becomes 200.
在这里我无法理解一件事,如果col1
仅包含一个值,即1
,那么为什么要在repartition
中创建200个分区?
I am not able to understand one thing here that if col1
contains only one value, i.e., 1
then why it is creating 200 partitions in repartition
?
推荐答案
repartition(columnName)
默认情况下会创建200个分区(更具体地讲,spark.sql.shuffle.partitions
分区),无论col1
有多少个唯一值.如果只有col1
的1个唯一值,则199个分区为空.另一方面,如果col1
的唯一值超过200个,则每个分区将具有多个col1
值.
repartition(columnName)
per default creates 200 partitions (more specific, spark.sql.shuffle.partitions
partitions), no matter how many unique values of col1
there are. If there is only 1 unique value of col1
, then 199 of the partitions are empty. On the other hand, if you have more than 200 unique values of col1
, you will will have multiple values of col1
per partition.
如果只需要1个分区,则可以执行repartition(1,col("col1"))
或仅执行coalesce(1)
.但不是coalesce
的行为方式不同,因为coalesce
在您的代码中可能会进一步上移,以至于您可能失去并行性(请参阅
If you only want 1 partition, then you can do repartition(1,col("col1"))
or just coalesce(1)
. But not that coalesce
does not behave the same in the sense that coalesce
me be moved further up in your code suscht that you may loose parallelism (see How to prevent Spark optimization)
如果您要检查分区的内容,我为此做了2种方法:
If you want to check the content of your partition, I've made 2 methods for this:
// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
Iterator(partIt.toSeq.size)
}
).toDF("record_count")
}
// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
val part = partIt.toSet
val partSize = part.size
val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
val partKeyStr = partKeys.mkString(", ")
val partKeyCount = partKeys.size
Iterator((partKeys.toArray,partSize))
}
).toDF("partitions","record_count")
}
现在您可以例如像这样检查您的数据框:
Now you can e.g. check your dataframe like this:
inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show
这篇关于为什么Spark DataFrame创建的分区数量错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!