JDBC 到 Spark Dataframe - 如何确保均匀分区? [英] JDBC to Spark Dataframe - How to ensure even partitioning?

查看:53
本文介绍了JDBC 到 Spark Dataframe - 如何确保均匀分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 的新手,正在使用 spark.read.jdbc 通过 JDBC 从 Postgres 数据库表创建 DataFrame.

我对分区选项有些困惑,特别是 partitionColumnlowerBoundupperBoundnumPartitions>.

<小时>
  • 文档似乎表明这些字段是可选的.如果我不提供它们会怎样?
  • Spark 如何知道如何对查询进行分区?这会有多高效?
  • 如果我确实指定了这些选项,即使 partitionColumn 分布不均匀,我如何确保分区大小大致相同?

假设我将有 20 个执行程序,因此我将 numPartitions 设置为 20.
我的 partitionColumn 是一个自动递增的 ID 字段,假设值范围从 1 到 2,000,000
但是,由于用户选择处理一些非常旧的数据以及一些非常新的数据,中间没有任何内容,因此大多数数据的 ID 值要么低于 100,000,要么高于 1,900,000.

  • 我的第 1 位和第 20 位执行者会得到大部分工作,而其他 18 位执行者大部分时间都坐在那里吗?

  • 如果是这样,有没有办法防止这种情况发生?

解决方案

所有这些选项是什么:spark.read.jdbc 指的是从 RDBMS 读取表.

并行是火花的力量,为了实现这一点,您必须提及所有这些选项.

问题[s] :-)

<块引用>

1) 文档似乎表明这些字段是可选的.如果我不提供它们会怎样?

答案:默认并行度或并行度差

基于场景开发人员必须注意性能调优策略.并确保数据跨越边界(也称为分区)拆分,而这些边界又将是并行任务.通过这种方式看到.

<块引用>

2) Spark 如何知道如何对查询进行分区?这会有多高效?

jdbc-reads - 引用数据块文档

您可以根据数据集的列值提供分割边界.

  • 这些选项指定读取时的并行度.
  • 如果指定了其中任何一个,则必须全部指定这些选项.
<块引用>

注意

这些选项指定读取表的并行度.lowerBoundupperBound 决定分区步幅,但不过滤表中的行.因此,Spark 分区并返回所有表中的行.

示例 1:
您可以使用 partitionColumnlowerBoundupperBoundemp_no 列将读取的表拆分为 emp_no 列上的执行程序code>numPartitions 参数.

val df = spark.read.jdbc(url=jdbcUrl,表=员工",columnName="emp_no",下限=1L,上限=100000L,numPartitions=100,连接属性=连接属性)

also numPartitions 表示您要求 RDBMS 读取数据的并行连接数.如果您提供 numPartitions,那么您将限制连接数...而不会耗尽 RDBMS 端的连接...

示例 2 来源:datastax 演示以在 cassandra 中加载 oracle 数据 :

val basePartitionedOracleData = sqlContext.读.format("jdbc").选项(地图[字符串,字符串](网址"->"jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",数据库表"->"示例表",下限"->"1",上界"->"10000",numPartitions"->10",分区列"->关键列")).加载()

该映射中的最后四个参数用于获取分区数据集.如果你通过其中任何一个,你必须通过所有这些.

当你传入这些额外的参数时,它的作用是:

它构建了一个格式为

的SQL语句模板

SELECT * FROM {tableName} WHERE {partitionColumn} >= ?和{partitionColumn} 

它向数据库引擎发送 {numPartitions} 语句.如果您提供这些值:{dbTable=ExampleTable,lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn},就会创建这十个声明:

SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn <;1001SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn <;2000年SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn <;3000SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn <;4000SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn <;5000SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn <;6000SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn <;7000SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn <;8000SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn <;9000SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn <;10000然后它将每个查询的结果放在 Spark 中自己的分区中.

<小时>

问题[s] :-)

<块引用>

如果我确实指定了这些选项,我如何确保分区即使 partitionColumn 不均匀,大小也大致相同分散式?

我的第 1 个和第 20 个执行者会得到大部分工作吗,而另一个18个执行者坐在那里大多闲着?

如果是这样,有没有办法防止这种情况发生?

<小时>

所有问题都有一个答案

下面是方法...1)您需要了解每个分区的记录/行数....基于此您可以repartitioncoalesce

代码段 1:Spark 1.6 >
spark 2.x 提供了知道分区中有多少记录的工具.

spark_partition_id() 存在于 org.apache.spark.sql.functions

import org.apache.spark.sql.functions._val df = "<您的数据帧通过 rdbms 读取.... 使用 spark.read.jdbc>"df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show

代码段 2:适用于所有版本的 spark

df.rdd.mapPartitionsWithIndex{case (i,rows) =>迭代器((i,rows.size))}.toDF("partition_number","NumberOfRecordsPerPartition").展示

然后您需要再次合并您的策略,在范围或重新分区之间进行查询调整......,您可以使用 mappartitions 或 foreachpartitions

<块引用>

结论:我更喜欢使用适用于数字列的给定选项,因为我已经看到它将数据划分为均匀的边界/分区.

有时可能无法使用这些选项然后手动需要调整分区/并行度...

<小时>

更新:

下面我们可以实现均匀分布...

  1. 获取表的主键.
  2. 找出关键的最小值和最大值.
  3. 使用这些值执行 Spark.

<预><代码>def main(args: Array[String]){//解析输入参数...val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")val min = result.getString(1).toIntval max = result.getString(2).toIntval numPartitions = (max - min)/5000 + 1val spark = SparkSession.builder().appName("Spark 读取 jdbc").getOrCreate()var df = spark.read.format("jdbc").option("url", s"${url}${config("schema")}").选项(驱动程序",com.mysql.jdbc.Driver").选项(下限",分钟).选项(上限",最大值).选项(numPartitions",numPartitions).选项(分区列",主键).选项(dbtable",配置(表")).选项(用户",用户).选项(密码",密码).加载()//这里有一些数据操作...df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)}

I am new to Spark, and am working on creating a DataFrame from a Postgres database table via JDBC, using spark.read.jdbc.

I am a bit confused about the partitioning options, in particular partitionColumn, lowerBound, upperBound, and numPartitions.


  • The documentation seems to indicate that these fields are optional. What happens if I don't provide them?
  • How does Spark know how to partition the queries? How efficient will that be?
  • If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

Let's say I'm going to have 20 executors, so I set my numPartitions to 20.
My partitionColumn is an auto-incremented ID field, and let's say the values range from 1 to 2,000,000
However, because the user selects to process some really old data, along with some really new data, with nothing in the middle, most of the data has ID values either under 100,000 or over 1,900,000.

  • Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

  • If so, is there a way to prevent this?

解决方案

What are all these options : spark.read.jdbc refers to reading a table from RDBMS.

parallelism is power of spark, in order to achieve this you have to mention all these options.

Question[s] :-)

1) The documentation seems to indicate that these fields are optional. What happens if I don't provide them ?

Answer : default Parallelism or poor parallelism

Based on scenario developer has to take care about the performance tuning strategy. and to ensure data splits across the boundaries (aka partitions) which in turn will be tasks in parallel. By seeing this way.

2) How does Spark know how to partition the queries? How efficient will that be?

jdbc-reads -referring to databricks docs

You can provide split boundaries based on the dataset’s column values.

  • These options specify the parallelism on read.
  • These options must all be specified if any of them is specified.

Note

These options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

Example 1:
You can split the table read across executors on the emp_no column using the partitionColumn, lowerBound, upperBound, and numPartitions parameters.

val df = spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties)

also numPartitions means number of parllel connections you are asking RDBMS to read the data. if you are providing numPartitions then you are limiting number of connections... with out exhausing the connections at RDBMS side..

Example 2 source : datastax presentation to load oracle data in cassandra :

val basePartitionedOracleData = sqlContext
.read
.format("jdbc")
.options(
Map[String, String](
"url" -> "jdbc:oracle:thin:username/password@//hostname:port/oracle_svc",
"dbtable" -> "ExampleTable",
"lowerBound" -> "1",
"upperBound" -> "10000",
"numPartitions" -> "10",
"partitionColumn" -> "KeyColumn"
)
)
.load()

The last four arguments in that map are there for the purpose of getting a partitioned dataset. If you pass any of them, you have to pass all of them.

When you pass these additional arguments in, here’s what it does:

It builds a SQL statement template in the format

SELECT * FROM {tableName} WHERE {partitionColumn} >= ? AND
{partitionColumn} < ?

It sends {numPartitions} statements to the DB engine. If you suppled these values: {dbTable=ExampleTable, lowerBound=1, upperBound=10,000, numPartitions=10, partitionColumn=KeyColumn}, it would create these ten statements:

SELECT * FROM ExampleTable WHERE KeyColumn >= 1 AND KeyColumn < 1001
SELECT * FROM ExampleTable WHERE KeyColumn >= 1001 AND KeyColumn < 2000
SELECT * FROM ExampleTable WHERE KeyColumn >= 2001 AND KeyColumn < 3000
SELECT * FROM ExampleTable WHERE KeyColumn >= 3001 AND KeyColumn < 4000
SELECT * FROM ExampleTable WHERE KeyColumn >= 4001 AND KeyColumn < 5000
SELECT * FROM ExampleTable WHERE KeyColumn >= 5001 AND KeyColumn < 6000
SELECT * FROM ExampleTable WHERE KeyColumn >= 6001 AND KeyColumn < 7000
SELECT * FROM ExampleTable WHERE KeyColumn >= 7001 AND KeyColumn < 8000
SELECT * FROM ExampleTable WHERE KeyColumn >= 8001 AND KeyColumn < 9000
SELECT * FROM ExampleTable WHERE KeyColumn >= 9001 AND KeyColumn < 10000
And then it would put the results of each of those queries in its own partition in Spark.


Question[s] :-)

If I DO specify these options, how do I ensure that the partition sizes are roughly even if the partitionColumn is not evenly distributed?

Will my 1st and 20th executors get most of the work, while the other 18 executors sit there mostly idle?

If so, is there a way to prevent this?


All questions has one answer

Below is the way... 1) You need to understand how many number of records/rows per partition.... based on this you can repartition or coalesce

Snippet 1: Spark 1.6 >
spark 2.x provides facility to know how many records are there in the partition.

spark_partition_id() exists in org.apache.spark.sql.functions

import org.apache.spark.sql.functions._ 
val df = "<your dataframe read through rdbms.... using spark.read.jdbc>"
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count.show

Snippet 2 : for all version of spark

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","NumberOfRecordsPerPartition")
  .show

and then you need to incorporate your strategy again query tuning between ranges or repartitioning etc.... , you can use mappartitions or foreachpartitions

Conclusion : I prefer using given options which works on number columns since I have seen it was dividing data in to uniform across bounderies/partitions.

Some time it may not be possible to use these option then manually tuning the partitions/parllelism is required...


Update :

With the below we can achive uniform distribution...

  1. Fetch the primary key of the table.
  2. Find the key minimum and maximum values.
  3. Execute Spark with those values.


def main(args: Array[String]){
// parsing input parameters ...
val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1
val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      
}

这篇关于JDBC 到 Spark Dataframe - 如何确保均匀分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆