Spark:重新分区与 partitionBy 中列参数的顺序 [英] Spark: Order of column arguments in repartition vs partitionBy

查看:80
本文介绍了Spark:重新分区与 partitionBy 中列参数的顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑的方法(Spark 2.2.1):

  1. DataFrame.repartition(采用 partitionExprs: Column* 参数的两个实现)
  2. DataFrameWriter.partitionBy

注意:本题不问这些方法的区别

来自 docs of partitionBy:

<块引用>

如果指定,输出将在类似于Hive分区方案 的文件系统上布置.例如,当我们按年份和月份对 Dataset 进行分区时,目录布局 将如下所示:

  • 年=2016/月=01/
  • 年=2016/月=02/

由此,我推断列参数的顺序将决定目录布局;因此它相关.

来自 docs of repartition:

<块引用>

返回由给定的分区表达式分区的新Dataset,使用spark.sql.shuffle.partitions作为分区数.生成的 Datasethash 分区.

根据我目前的理解,repartition 决定了处理 DataFrame并行度.有了这个定义,repartition(numPartitions: Int) 的行为很简单,但是对于采用 partitionExprs 的 repartition 的另外两个实现就不能这样说了:列* 参数.

<小时>

综上所述,我的疑惑如下:

  • partitionBy方法一样,列的顺序输入是否也与repartition方法相关?
  • 如果上述问题的答案是
    • :为并行执行提取的每个 chunk 是否包含与如果我们运行 SQL 在同一列上使用 GROUP BY 查询?
    • :请解释repartition(columnExprs: Column*)方法的行为
  • repartition 的第三个实现中同时具有 numPartitions: IntpartitionExprs: Column* 参数有什么相关性?

解决方案

这两种方法的唯一相似之处在于它们的名称.它们用于不同的事物并且具有不同的机制,因此您根本不应该比较它们.

话虽如此,repartition 使用以下方法打乱数据:

  • 使用 partitionExprs,它使用 spark.sql.shuffle.partitions 在表达式中使用的列上使用哈希分区器.
  • 使用partitionExprsnumPartitions,它的作用与前一个相同,但覆盖了spark.sql.shuffle.partitions.
  • 使用 numPartitions 它只是使用 RoundRobinPartitioning 重新排列数据.
<块引用>

重新分区方法中相关的列输入顺序?

是的.hash((x, y)) 通常与 hash((y, x)) 不同.

df = (spark.range(5, numPartitions=4).toDF("x").selectExpr("cast(x as string)").crossJoin(spark.range(5, numPartitions=4).toDF("y")))df.repartition(4, "y", "x").rdd.glom().map(len).collect()

[8, 6, 9, 2]

df.repartition(4, "x", "y").rdd.glom().map(len).collect()

[6, 4, 3, 12]

<块引用>

如果我们在相同的列上使用 GROUP BY 运行 SQL 查询,那么为并行执行提取的每个块是否包含与每个组中相同的数据?

取决于确切的问题是什么.

相关如何定义DataFrame的分区?

Methods taken into consideration (Spark 2.2.1):

  1. DataFrame.repartition (the two implementations that take partitionExprs: Column* parameters)
  2. DataFrameWriter.partitionBy

Note: This question doesn't ask the difference between these methods

From docs of partitionBy:

If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a Dataset by year and then month, the directory layout would look like:

  • year=2016/month=01/
  • year=2016/month=02/

From this, I infer that the order of column arguments will decide the directory layout; hence it is relevant.

From docs of repartition:

Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

As per my current understanding, repartition decides the degree of parallelism in handling the DataFrame. With this definition, behaviour of repartition(numPartitions: Int) is straightforward but the same can't be said about the other two implementations of repartition that take partitionExprs: Column* arguments.


All things said, my doubts are following:

  • Like partitionBy method, is the order of column inputs relevant in repartition method too?
  • If the answer to above question is
    • No: Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a SQL query with GROUP BY on same columns?
    • Yes: Please explain the behaviour of repartition(columnExprs: Column*) method
  • What is the relevance of having both numPartitions: Int and partitionExprs: Column* arguments in the third implementation of repartition?

解决方案

The only similarity between these two methods are their names. There are used for different things and have different mechanics so you shouldn't compare them at all.

That being said, repartition shuffles data using:

  • With partitionExprs it uses hash partitioner on the columns used in the expression using spark.sql.shuffle.partitions.
  • With partitionExprs and numPartitions it does the same as the previous one, but overriding spark.sql.shuffle.partitions.
  • With numPartitions it just rearranges data using RoundRobinPartitioning.

the order of column inputs relevant in repartition method too?

It is. hash((x, y)) is in general not the same as hash((y, x)).

df = (spark.range(5, numPartitions=4).toDF("x")
    .selectExpr("cast(x as string)")
    .crossJoin(spark.range(5, numPartitions=4).toDF("y")))

df.repartition(4, "y", "x").rdd.glom().map(len).collect()

[8, 6, 9, 2]

df.repartition(4, "x", "y").rdd.glom().map(len).collect()

[6, 4, 3, 12]

Does each chunk extracted for parallel execution contain the same data as would have been in each group had we run a SQL query with GROUP BY on same columns?

Depending on what is the exact question.

  • Yes. GROUP BY with the same set of columns will result in the same logical distribution of keys over partitions.
  • No. Hash partitioner can map multiple keys to the same partition. GROUP BY "sees" only the actual groups.

Related How to define partitioning of DataFrame?

这篇关于Spark:重新分区与 partitionBy 中列参数的顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆