JDBC到Spark Dataframe-如何确保均匀分区? [英] JDBC to Spark Dataframe - How to ensure even partitioning?

查看:161
本文介绍了JDBC到Spark Dataframe-如何确保均匀分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark的新手,并且正在使用spark.read.jdbc通过JDBC通过Postgres数据库表创建DataFrame.

I am new to Spark, and am working on creating a DataFrame from a Postgres database table via JDBC, using spark.read.jdbc.

我对分区选项有些困惑,尤其是 partitionColumn lowerBound upperBound numPartitions >.

I am a bit confused about the partitioning options, in particular partitionColumn, lowerBound, upperBound, and numPartitions.

  • 文档似乎表明这些字段是可选的. 如果我不提供它们会怎样?
  • Spark如何知道如何对查询进行分区?效率如何?
  • 如果我确实指定了这些选项,那么即使partitionColumn没有均匀分布,如何确保分区大小是大致的?
  • The documentation seems to indicate that these fields are optional. What happens if I don't provide them?
  • How does Spark know how to partition the queries? How efficient will that be?
  • If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

假设我要有20个执行者,所以我将numPartitions设置为20.
我的partitionColumn是一个自动递增的ID字段,假设值的范围是1到2,000,000
但是,由于用户选择处理一些真正的旧数据以及一些真正的新数据,而中间没有任何内容,因此大多数数据的ID值都在100,000以下或1,900,000以上.

Let's say I'm going to have 20 executors, so I set my numPartitions to 20.
My partitionColumn is an auto-incremented ID field, and let's say the values range from 1 to 2,000,000
However, because the user selects to process some really old data, along with some really new data, with nothing in the middle, most of the data has ID values either under 100,000 or over 1,900,000.

  • 我的第1位执行者和第20位执行者会获得大部分工作,而其他18位执行者大多在那里闲着吗?

  • Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

如果是这样,有办法防止这种情况吗?

If so, is there a way to prevent this?

推荐答案

所有这些选项是什么:spark.read.jdbc是指从RDBMS读取表.

What are all these options : spark.read.jdbc refers to reading a table from RDBMS.

并行是火花的力量,为了实现这一点,您必须提及所有这些选项.

parallelism is power of spark, in order to achieve this you have to mention all these options.

问题:-)

1)文档似乎表明这些字段是可选的.如果我不提供它们会怎样?

1) The documentation seems to indicate that these fields are optional. What happens if I don't provide them ?

答案:默认并行性或并行性差

Answer : default Parallelism or poor parallelism

基于方案,开发人员必须注意性能调整策略.并确保跨边界(又称分区)的数据拆分,而这又将是并行任务.通过这种方式.

Based on scenario developer has to take care about the performance tuning strategy. and to ensure data splits across the boundaries (aka partitions) which in turn will be tasks in parallel. By seeing this way.

2)Spark如何知道如何对查询进行分区?效率如何?

2) How does Spark know how to partition the queries? How efficient will that be?

jdbc-reads-引用数据块文档

您可以根据数据集的列值提供分割边界.

You can provide split boundaries based on the dataset’s column values.

  • 这些选项指定读取时的并行性.
  • 如果指定了这些选项,则必须全部指定.

注意

这些选项指定读取的表的并行性. lowerBoundupperBound决定分区的步幅,但不决定 过滤表中的行.因此,Spark分区并返回所有 表中的行.

These options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

示例1:
您可以使用partitionColumnlowerBoundupperBoundnumPartitions参数在emp_no列上的执行程序之间拆分读取的表.

Example 1:
You can split the table read across executors on the emp_no column using the partitionColumn, lowerBound, upperBound, and numPartitions parameters.

val df = spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties)

numPartitions表示您要RDBMS读取数据的并行连接数.如果您要提供numPartitions,那么您将限制连接数...而不会耗尽RDBMS端的连接..

also numPartitions means number of parllel connections you are asking RDBMS to read the data. if you are providing numPartitions then you are limiting number of connections... with out exhausing the connections at RDBMS side..

示例2来源:

该图中的最后四个参数是为了获取分区的数据集.如果您通过其中任何一项, 您必须全部通过.

The last four arguments in that map are there for the purpose of getting a partitioned dataset. If you pass any of them, you have to pass all of them.

当您传入这些其他参数时,它的作用是:

它以

SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?

它将{numPartitions}语句发送到数据库引擎.如果您提供以下值:{dbTable = ExampleTable, lowerBound = 1,upperBound = 10,000,numPartitions = 10,partitionColumn = KeyColumn},它将创建这十个 声明:

It sends {numPartitions} statements to the DB engine. If you suppled these values: {dbTable=ExampleTable, lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn}, it would create these ten statements:

SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.


问题:-)

如果我确实指定了这些选项,那么如何确保分区 即使partitionColumn不均匀,大小也大致相同 分散式?

If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

我的第1位执行者和第20位执行者将获得大部分工作,而其他人 18个遗嘱执行人大多坐在那儿闲着?

Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

如果是这样,有什么方法可以防止这种情况发生?

If so, is there a way to prevent this?


所有问题都有一个答案


All questions has one answer

下面是方法... 1)您需要了解每个分区有多少个记录/行....基于此,您可以repartitioncoalesce

Below is the way... 1) You need to understand how many number of records/rows per partition.... based on this you can repartition or coalesce

代码段1:Spark 1.6>
spark 2.x提供了知道分区中有多少记录的功能.

Snippet 1: Spark 1.6 >
spark 2.x provides facility to know how many records are there in the partition.

spark_partition_id()存在于org.apache.spark.sql.functions

import org.apache.spark.sql.functions._ 
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show

代码段2:适用于所有版本的spark

Snippet 2 : for all version of spark

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","NumberOfRecordsPerPartition")
  .show

,然后您需要再次合并策略以查询范围之间的调整或重新分区等....,可以使用mappartitions或foreachpartitions

and then you need to incorporate your strategy again query tuning between ranges or repartitioning etc.... , you can use mappartitions or foreachpartitions

结论::我更喜欢使用适用于数字列的给定选项,因为我已经看到它正在将数据分成多个 边界/分区.

Conclusion : I prefer using given options which works on number columns since I have seen it was dividing data in to uniform across bounderies/partitions.

有时可能无法使用这些选项,然后手动进行 需要调整分区/并行性...

Some time it may not be possible to use these option then manually tuning the partitions/parllelism is required...


更新:


Update :

通过以下内容,我们可以实现均匀分布. ..

  1. 获取表的主键.
  2. 找到关键的最小值和最大值.
  3. 使用这些值执行Spark.


def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      
}

这篇关于JDBC到Spark Dataframe-如何确保均匀分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆