如何在spark-jdbc连接中操作numPartitions,lowerBound,upperBound? [英] How to operate numPartitions, lowerBound, upperBound in the spark-jdbc connection?

查看:250
本文介绍了如何在spark-jdbc连接中操作numPartitions,lowerBound,upperBound?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用spark-jdbc在postgres db上读取表.为此,我想出了以下代码:

I am trying to read a table on postgres db using spark-jdbc. For that I have come up with the following code:

object PartitionRetrieval {
  var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/myuser/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "base.ledgers"
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  def main(args: Array[String]): Unit = {
    val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
    import spark.implicits._
    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
    val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
    println("gpTable Count: " + rc)
  }
}

现在,我正在获取行数,以查看连接是成功还是失败.这是一个很大的表,但我得到的计数却运行缓慢,因为我没有为应该进行数据分区的分区号和列名指定任何参数.

Right now, I am fetching the count of the rows just to see if the connection is success or failed. It is a huge table and it runs slower to get the count which I understand as there are no parameters given for partition number and column name on which the data partition should happen.

在很多地方,我看到jdbc对象是通过以下方式创建的:

In lot of places, I see the jdbc object is created in the below way:

val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties) 

,然后我使用 options 用另一种格式创建了它.我无法理解如何在使用'options'形成jdbc连接时给numPartitions分区列名称,我希望在该分区上分区数据: val gpTable = spark.read.format("jdbc").option("url",connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

and I created it in another format using options. I am unable to understand how to give the numPartitions, partition column name on which I want the data to be partitioned when the jdbc connection is formed using 'options': val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

有人能让我知道吗

  1. 如何添加参数: numPartitions,lowerBound,upperBound 到以这种方式编写的jdbc对象:

  1. How do I add the parameters: numPartitions, lowerBound, upperBound to the jdbc object written in this way:

val gpTable = spark.read.format("jdbc").option("url",connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

如何仅添加列名 numPartition ,因为我要获取年份中的所有行:2017年,我不需要范围选择的行数(lowerBound,upperBound)

How to add just columnname and numPartition Since I want to fetch all the rows that are from the year: 2017 and I don't want a range of rows to be picked (lowerBound, upperBound)

推荐答案

选项 numPartitions,lowerBound,upperBound和PartitionColumn 控制Spark并行读取.您需要PartitionColumn的整数列.如果表中没有合适的列,则可以使用 ROW_NUMBER 作为分区列.

The options numPartitions, lowerBound, upperBound and PartitionColumn control the parallel read in spark. You need a integral column for PartitionColumn. If you don't have any in suitable column in your table, then you can use ROW_NUMBER as your partition Column.

尝试一下

val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
                                       .option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
                                       .option("user",devUserName)
                                       .option("password",devPassword)
                                       .load()
                                       .collect()
                                       .map(row => row.getAs[Int]("count")).head

我们获得了所提供谓词可以用作upperBount的返回行数.

We got the count of the rows returned for the provided predicate which can be used as the upperBount.

val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                   .option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
                                   .option("user",devUserName)
                                   .option("password",devPassword)
                                   .option("numPartitions", 10)
                                   .option("partitionColumn", "RNO")
                                   .option("lowerBound", 1)
                                   .option("upperBound", rowCount)
                                   .load()

numPartitions取决于与Postgres DB的并行连接数.您可以在从数据库中读取数据时根据所需的并行度进行调整.

The numPartitions depends on the number of parallel connection to your Postgres DB. You can adjust this based on the parallelization required while reading from your DB.

这篇关于如何在spark-jdbc连接中操作numPartitions,lowerBound,upperBound?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆