Spark:read.jdbc(.. numPartitions ..)和repartition(.. numPartitions ..)中的numPartitions之间的差异 [英] Spark: Difference between numPartitions in read.jdbc(..numPartitions..) and repartition(..numPartitions..)

查看:774
本文介绍了Spark:read.jdbc(.. numPartitions ..)和repartition(.. numPartitions ..)中的numPartitions之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在以下方法中,我对numPartitions参数的行为感到困惑:

I'm perplexed between the behaviour of numPartitions parameter in the following methods:

  1. DataFrameReader.jdbc
  2. Dataset.repartition
  1. DataFrameReader.jdbc
  2. Dataset.repartition

numPartitions : 分区数.这与lowerBound(含)和upperBound(不含)一起,为生成的WHERE子句表达式形成了分区步幅,该表达式用于均匀拆分列columnName.

numPartitions: the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly.

返回一个完全具有numPartitions分区的新数据集.

Returns a new Dataset that has exactly numPartitions partitions.


我目前的理解:


My current understanding:

  1. DataFrameReader.jdbc方法中的numPartition参数控制从数据库读取数据时的并行度
  2. Dataset.repartition中的numPartition参数控制将DataFrame写入磁盘时将生成的输出文件数
  1. The numPartition parameter in DataFrameReader.jdbc method controls the degree of parallelism in reading the data from database
  2. The numPartition parameter in Dataset.repartition controls the number of output files that will be generated when this DataFrame would be written to disk


我的问题:


My questions:

  1. 如果我通过DataFrameReader.jdbc读取DataFrame,然后将其写入磁盘(不调用repartition方法),那么输出的内容将仍然与我写出DataFrame调用了磁盘上的repartition后是否移到磁盘上?
  2. 如果上述问题的答案是:
    • 是:那么在使用DataFrameReader.jdbc方法(带有numPartitions参数)读取的DataFrame上调用repartition方法是否多余?
    • 否:然后请更正我的理解中的错误.同样在这种情况下,不应该将DataFrameReader.jdbc方法的numPartitions参数称为'parallelism'之类的东西吗?
  1. If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?
  2. If the answer to the above question is:
    • Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?
    • No: Then please correct the lapses in my understanding. Also in that case shouldn't the numPartitions parameter of DataFrameReader.jdbc method be called something like 'parallelism'?

推荐答案

简短答案:两种方法中numPartitions参数的行为(几乎)没有区别

Short answer: There is (almost) no difference in behaviour of numPartitions parameter in the two methods

read.jdbc(..numPartitions..)

read.jdbc(..numPartitions..)

在这里,numPartitions参数控制:

    MySQL(或任何其他RDBM)读取数据DataFrame
  1. 并行连接数.
  2. 并行度,用于读取DataFrame上的所有后续操作,包括写入磁盘,直到在其上调用repartition方法
  1. number of parallel connections that would be made to the MySQL (or any other RDBM) for reading the data into DataFrame.
  2. Degree of parallelism on all subsequent operations on the read DataFrame including writing to disk until repartition method is invoked on it


repartition(..numPartitions..)


repartition(..numPartitions..)

此处numPartitions参数控制并行度,该并行度将在执行DataFrame的任何操作中显示,包括写入磁盘.

Here numPartitions parameter controls the degree of parallelism that would be exhibited in performing any operation of the DataFrame, including writing to disk.

因此,基本上,使用spark.read.jdbc(..numPartitions..)方法读取MySQL表时获得的DataFrame的行为与相同(表现出相同的并行度). > read 而没有 parallelism ,并且随后调用了repartition(..numPartitions..)方法(显然具有相同的numPartitions值)

So basically the DataFrame obtained on reading MySQL table using spark.read.jdbc(..numPartitions..) method behaves the same (exhibits the same degree of parallelism in operations performed over it) as if it was read without parallelism and the repartition(..numPartitions..) method was invoked on it afterwards (obviously with same value of numPartitions)

要回答确切的问题:

如果我通过DataFrameReader.jdbc读取了DataFrame,然后将其写入磁盘 (不调用分区方法),那么仍然会有 输出很多文件,因为如果我写了一个 将DataFrame调用到磁盘上之后,将其保存到磁盘上了吗?

If I read DataFrame via DataFrameReader.jdbc and then write it to disk (without invoking repartition method), then would there still be as many files in output as there would've been had I written out a DataFrame to disk after having invoked repartition on it?

假设通过提供适当的参数(columnNamelowerBoundupperBoundnumPartitions),DataFrame 包括写入的所有操作均将并行执行.在此处引用官方文档 :

Assuming that the read task had been parallelized by providing appropriate parameters (columnName, lowerBound, upperBound & numPartitions), all operations on the resulting DataFrame including write will be performed in parallel. Quoting the official docs here:

numPartitions:可用于表读写的并行性的最大分区数.这也确定了并发JDBC连接的最大数量.如果要写入的分区数超过了此限制,我们可以通过在写入之前调用Coalesce(numPartitions)来将其降至此限制.

numPartitions: The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.


是:那么在使用DataFrameReader.jdbc方法(带有numPartitions参数)读取的DataFrame上调用重新分区方法是否多余?

Yes: Then is it redundant to invoke repartition method on a DataFrame that was read using DataFrameReader.jdbc method (with numPartitions parameter)?

除非调用repartition方法的其他变体(采用columnExprs参数的变体),否则在这样的DataFrame(具有相同的numPartitions)参数上调用repartition是多余的.但是,我不确定在已经并行化 DataFrame上强制相同的并行度是否还会在改组 >不必要.一旦我找到答案,便会更新答案.

Unless you invoke the other variations of repartition method (the ones that take columnExprs param), invoking repartition on such a DataFrame (with same numPartitions) parameter is redundant. However, I'm not sure if forcing same degree of parallelism on an already-parallelized DataFrame also invokes shuffling of data among executors unnecessarily. Will update the answer once I come across it.

这篇关于Spark:read.jdbc(.. numPartitions ..)和repartition(.. numPartitions ..)中的numPartitions之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆