Spark:重新分区与 partitionBy 中列参数的顺序 [英] Spark: Order of column arguments in repartition vs partitionBy
问题描述
考虑的方法(Spark 2.2.1
):
DataFrame.repartition
(采用partitionExprs: Column*
参数的两个实现)DataFrameWriter.partitionBy
注意:本题不问这些方法的区别
来自 docs of partitionBy
:
如果指定,输出将在类似于Hive
的分区方案 的文件系统上布置.例如,当我们按年份和月份对 Dataset
进行分区时,目录布局 将如下所示:
- 年=2016/月=01/
- 年=2016/月=02/
由此,我推断列参数的顺序将决定目录布局;因此它相关.
来自 docs of repartition
:
返回由给定的分区表达式分区的新Dataset
,使用spark.sql.shuffle.partitions
作为分区数.生成的 Dataset
是 hash 分区.
根据我目前的理解,repartition
决定了处理 DataFrame
的并行度.有了这个定义,repartition(numPartitions: Int)
的行为很简单,但是对于采用 partitionExprs 的
参数.repartition
的另外两个实现就不能这样说了:列*
综上所述,我的疑惑如下:
- 与
partitionBy
方法一样,列的顺序输入是否也与repartition
方法相关? - 如果上述问题的答案是
- 否:为并行执行提取的每个 chunk 是否包含与如果我们运行
SQL
在同一列上使用GROUP BY
查询? - 是:请解释
repartition(columnExprs: Column*)
方法的行为
- 否:为并行执行提取的每个 chunk 是否包含与如果我们运行
- 在
repartition
的第三个实现中同时具有numPartitions: Int
和partitionExprs: Column*
参数有什么相关性?
这两种方法的唯一相似之处在于它们的名称.它们用于不同的事物并且具有不同的机制,因此您根本不应该比较它们.
话虽如此,repartition
使用以下方法打乱数据:
- 使用
partitionExprs
,它使用spark.sql.shuffle.partitions
在表达式中使用的列上使用哈希分区器. - 使用
partitionExprs
和numPartitions
,它的作用与前一个相同,但覆盖了spark.sql.shuffle.partitions
. - 使用
numPartitions
它只是使用RoundRobinPartitioning
重新排列数据.
重新分区方法中相关的列输入顺序?
是的.hash((x, y))
通常与 hash((y, x))
不同.
df = (spark.range(5, numPartitions=4).toDF("x").selectExpr("cast(x as string)").crossJoin(spark.range(5, numPartitions=4).toDF("y")))df.repartition(4, "y", "x").rdd.glom().map(len).collect()
[8, 6, 9, 2]
df.repartition(4, "x", "y").rdd.glom().map(len).collect()
[6, 4, 3, 12]
<块引用>
如果我们在相同的列上使用 GROUP BY 运行 SQL 查询,那么为并行执行提取的每个块是否包含与每个组中相同的数据?
取决于确切的问题是什么.
- 是的.具有相同列集的
GROUP BY
将导致键在分区上的逻辑分布相同. - 没有.哈希分区器可以将多个键映射到同一个分区.
GROUP BY
只看到"实际的组.
Methods taken into consideration (Spark 2.2.1
):
DataFrame.repartition
(the two implementations that takepartitionExprs: Column*
parameters)DataFrameWriter.partitionBy
Note: This question doesn't ask the difference between these methods
From docs of partitionBy
:
If specified, the output is laid out on the file system similar to
Hive
's partitioning scheme. As an example, when we partition aDataset
by year and then month, the directory layout would look like:
- year=2016/month=01/
- year=2016/month=02/
From this, I infer that the order of column arguments will decide the directory layout; hence it is relevant.
From docs of repartition
:
Returns a new
Dataset
partitioned by the given partitioning expressions, usingspark.sql.shuffle.partitions
as number of partitions. The resultingDataset
is hash partitioned.
As per my current understanding, repartition
decides the degree of parallelism in handling the DataFrame
. With this definition, behaviour of repartition(numPartitions: Int)
is straightforward but the same can't be said about the other two implementations of repartition
that take partitionExprs: Column*
arguments.
All things said, my doubts are following:
- Like
partitionBy
method, is the order of column inputs relevant inrepartition
method too? - If the answer to above question is
- No: Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a
SQL
query withGROUP BY
on same columns? - Yes: Please explain the behaviour of
repartition(columnExprs: Column*)
method
- No: Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a
- What is the relevance of having both
numPartitions: Int
andpartitionExprs: Column*
arguments in the third implementation ofrepartition
?
The only similarity between these two methods are their names. There are used for different things and have different mechanics so you shouldn't compare them at all.
That being said, repartition
shuffles data using:
- With
partitionExprs
it uses hash partitioner on the columns used in the expression usingspark.sql.shuffle.partitions
. - With
partitionExprs
andnumPartitions
it does the same as the previous one, but overridingspark.sql.shuffle.partitions
. - With
numPartitions
it just rearranges data usingRoundRobinPartitioning
.
the order of column inputs relevant in repartition method too?
It is. hash((x, y))
is in general not the same as hash((y, x))
.
df = (spark.range(5, numPartitions=4).toDF("x")
.selectExpr("cast(x as string)")
.crossJoin(spark.range(5, numPartitions=4).toDF("y")))
df.repartition(4, "y", "x").rdd.glom().map(len).collect()
[8, 6, 9, 2]
df.repartition(4, "x", "y").rdd.glom().map(len).collect()
[6, 4, 3, 12]
Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a SQL query with GROUP BY on same columns?
Depending on what is the exact question.
- Yes.
GROUP BY
with the same set of columns will result in the same logical distribution of keys over partitions. - No. Hash partitioner can map multiple keys to the same partition.
GROUP BY
"sees" only the actual groups.
Related How to define partitioning of DataFrame?
这篇关于Spark:重新分区与 partitionBy 中列参数的顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!