Spark:read.jdbc(..numPartitions..) 和 repartition(..numPartitions..) 中 numPartitions 的区别 [英] Spark: Difference between numPartitions in read.jdbc(..numPartitions..) and repartition(..numPartitions..)

查看:45
本文介绍了Spark:read.jdbc(..numPartitions..) 和 repartition(..numPartitions..) 中 numPartitions 的区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对以下方法中 numPartitions 参数的行为感到困惑:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

官方文档 DataFrameReader.jdbc 关于numPartitions 参数

说如下<块引用>

numPartitions:分区数.这与lowerBound(包含)、upperBound(不包含)一起形成用于生成的WHERE子句表达式的分区步幅,用于均匀分割列columnName.

还有 official docs of Dataset.repartition

<块引用>

返回一个完全具有 numPartitions 个分区的新数据集.

<小时>

我目前的理解:

  1. DataFrameReader.jdbc 方法中的 numPartition 参数控制从数据库读取数据的并行度
  2. Dataset.repartition 中的 numPartition 参数控制着 DataFrame 将生成的输出文件数量> 将写入磁盘

<小时>

我的问题:

  1. 如果我通过 DataFrameReader.jdbc 读取 DataFrame 然后将其写入磁盘(不调用 repartition 方法),那么还会有在调用 repartition 之后,如果我将 DataFrame 写到磁盘上,输出中的文件会尽可能多吗?
  2. 如果上述问题的答案是:
    • 是的:那么在使用 DataFrameReader.jdbc 方法读取的 DataFrame 上调用 repartition 方法是否多余?numPartitions 参数)?
    • 否:那么请纠正我的理解错误.同样在这种情况下,DataFrameReader.jdbc 方法的 numPartitions 参数不应该被称为 'parallelism' 之类的东西吗?

解决方案

简短回答:两种方法中 numPartitions 参数的行为(几乎)没有区别

<小时>

read.jdbc(..numPartitions..)

这里,numPartitions 参数控制:

  1. 并行连接数,将与MySQL(或任何其他RDBM)用于读取数据strong> 到 DataFrame.
  2. 对读取DataFrame 的所有后续操作的并行度,包括写入磁盘,直到repartition 方法为调用它

<小时>

repartition(..numPartitions..)

此处numPartitions 参数控制并行度,在执行DataFrame 的任何操作时将显示该并行度,包括写入磁盘.

<小时>

所以基本上使用 spark.read.jdbc(..numPartitions..) 方法读取 MySQL 表时获得的 DataFrame 表现相同(在对其执行的操作中表现出相同的并行度)就好像它被读取而没有并行repartition(..numPartitions..) 方法之后被调用(显然具有相同的 numPartitions 值)

<小时>

要回答确切的问题:

<块引用>

如果我通过 DataFrameReader.jdbc 读取 DataFrame 然后将其写入磁盘(不调用重新分区方法),那么还会有输出中有许多文件,因为我写了一个DataFrame 调用重新分区后到磁盘?

是的

假设 read 任务已经通过提供适当的参数(columnNamelowerBound>upperBound & numPartitions),对结果DataFrame 包括写入所有操作将被执行在平行下.在此处引用 官方文档:

<块引用>

numPartitions:可用于表读写并行的最大分区数.这也决定了并发 JDBC 连接的最大数量.如果要写入的分区数超过此限制,我们会在写入前通过调用 coalesce(numPartitions) 将其减少到此限制.

<小时><块引用>

是的:那么在使用 DataFrameReader.jdbc 方法(使用 numPartitions 参数)读取的 DataFrame 上调用重新分区方法是否多余?

是的

除非您调用 repartition 方法的其他变体(采用 columnExprs 参数的那些),否则在这样的 上调用 repartitionDataFrame(具有相同的 numPartitions)参数是多余的.但是,我不确定在 已并行化 DataFrame 上强制使用相同的并行度 是否也会调用 shuffleexecutors 之间不必要的数据.一旦我遇到它会更新答案.

I'm perplexed between the behaviour of numPartitions parameter in the following methods:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

The official docs of DataFrameReader.jdbc say following regarding numPartitions parameter

numPartitions: the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly.

And official docs of Dataset.repartition say

Returns a new Dataset that has exactly numPartitions partitions.


My current understanding:

  1. The numPartition parameter in DataFrameReader.jdbc method controls the degree of parallelism in reading the data from database
  2. The numPartition parameter in Dataset.repartition controls the number of output files that will be generated when this DataFrame would be written to disk


My questions:

  1. If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?
  2. If the answer to the above question is:
    • Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?
    • No: Then please correct the lapses in my understanding. Also in that case shouldn't the numPartitions parameter of DataFrameReader.jdbc method be called something like 'parallelism'?

解决方案

Short answer: There is (almost) no difference in behaviour of numPartitions parameter in the two methods


read.jdbc(..numPartitions..)

Here, the numPartitions parameter controls:

  1. number of parallel connections that would be made to the MySQL (or any other RDBM) for reading the data into DataFrame.
  2. Degree of parallelism on all subsequent operations on the read DataFrame including writing to disk until repartition method is invoked on it


repartition(..numPartitions..)

Here numPartitions parameter controls the degree of parallelism that would be exhibited in performing any operation of the DataFrame, including writing to disk.


So basically the DataFrame obtained on reading MySQL table using spark.read.jdbc(..numPartitions..) method behaves the same (exhibits the same degree of parallelism in operations performed over it) as if it was read without parallelism and the repartition(..numPartitions..) method was invoked on it afterwards (obviously with same value of numPartitions)


To answer to exact questions:

If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?

Yes

Assuming that the read task had been parallelized by providing appropriate parameters (columnName, lowerBound, upperBound & numPartitions), all operations on the resulting DataFrame including write will be performed in parallel. Quoting the official docs here:

numPartitions: The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.


Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?

Yes

Unless you invoke the other variations of repartition method (the ones that take columnExprs param), invoking repartition on such a DataFrame (with same numPartitions) parameter is redundant. However, I'm not sure if forcing same degree of parallelism on an already-parallelized DataFrame also invokes shuffling of data among executors unnecessarily. Will update the answer once I come across it.

这篇关于Spark:read.jdbc(..numPartitions..) 和 repartition(..numPartitions..) 中 numPartitions 的区别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆