从JDBC源迁移数据时如何优化分区? [英] How to optimize partitioning when migrating data from JDBC source?

查看:102
本文介绍了从JDBC源迁移数据时如何优化分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将数据从PostgreSQL表中的表移动到HDFS上的Hive表中.为此,我想出了以下代码:

   val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
 

数据被插入到基于prtn_String_columns: source_system_name, period_year, period_num

动态分区的配置单元表中

已使用火花提交:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

在执行程序日志中生成以下错误消息:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

我在日志中看到,使用给定数量的分区,读取已正确执行,如下所示:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是分阶段执行者的状态:

数据未正确分区.一个分区较小,而另一个分区较大.这里有一个偏斜的问题. 将数据插入Hive表时,作业在以下行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF"),但我知道这是由于数据偏斜问题而发生的.

我试图增加执行程序的数量,增加执行程序的内存,驱动程序的内存,试图将其另存为csv文件,而不是将数据帧保存到Hive表中,但不会因给出异常而影响执行:

java.lang.OutOfMemoryError: GC overhead limit exceeded

代码中是否需要纠正任何内容?谁能让我知道如何解决这个问题?

解决方案

  1. 根据输入数据量和群集资源确定所需的分区数.根据经验,除非绝对必要,否则最好将分区输入保持在1GB以下.并且严格小于块大小限制.

    先前已声明,您迁移了1TB的数据值,这些数据在不同的帖子中使用(5-70)可能会降低到确保平滑处理的方式.

    尝试使用不需要进一步repartitioning的值.

  2. 了解您的数据.

    分析数据集中的可用列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中.这些是导入过程的良好候选者.另外,您应该确定一个确切的值范围.

    具有不同中心度和偏度度量的聚合以及直方图和按键的基本计数是很好的探索工具.对于这一部分,最好直接在数据库中分析数据,而不是将其提取到Spark.

    根据RDBMS的不同,您也许可以使用width_bucket(PostgreSQL,Oracle)或等效函数来了解如何在加载partitionColumnlowerBoundupperBound后在Spark中分发数据,numPartitons.

     s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
     

  3. 如果没有满足上述条件的列,请考虑:

    • 创建一个自定义对象并通过对其进行公开.一个看法.多个独立列上的哈希通常是不错的选择.请查阅数据库手册,以确定可以在此处使用的功能(在Oracle中为DBMS_CRYPTO,在PostgreSQL中为pgcrypto)*.
    • 使用一组独立的列,这些列在一起提供了足够高的基数.

      (可选)如果要写入分区的Hive表,则应考虑包括Hive分区列.这可能会限制以后生成的文件数量.

  4. 准备分区参数

    • 如果在先前步骤中选择或创建的列是数字(或日期/Spark中的时间戳> = 2.4 )直接将其作为partitionColumn提供,并使用之前确定的范围值填充lowerBoundupperBound.

      如果绑定值不能反映数据的属性(对于lowerBound,为min(col),对于upperBound,为max(col)),则可能导致明显的数据偏斜,因此请谨慎处理.在最坏的情况下,当边界不覆盖数据范围时,所有记录将由一台计算机获取,这使其根本不比完全没有分区好.

    • 如果在前面的步骤中选择的列是类别列或一组列,则生成可以完全覆盖数据的互斥谓词列表,其形式可以用于SQL where子句.

      例如,如果您的列A的值为{a1a2a3},列B的值为{b1b2b3}:

       val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"
       

      仔细检查条件是否重叠,是否涵盖所有组合.如果不满足这些条件,则最终将导致重复或缺少记录.

      将数据作为predicates参数传递给jdbc调用.请注意,分区的数量将完全等于谓词的数量.

  5. 以只读模式放置数据库(任何正在进行的写入操作都可能导致数据不一致.如果可能,应在开始整个过程​​之前锁定数据库,但如果不能,则在组织中).

  6. 如果没有repartition的分区数与所需的输出负载数据匹配并直接转储到接收器,则如果不符合,则可以尝试按照与步骤1相同的规则进行重新分区.

  7. 如果仍然遇到任何问题,请确保已正确配置了Spark内存和GC选项.

  8. 如果以上方法均无效:

    • 考虑将数据转储到网络/使用COPY TO之类的工具分配存储并直接从那里读取.

      请注意,在标准数据库实用程序中,通常将需要POSIX兼容文件系统,因此HDFS通常不需要.

      这种方法的优点是您不必担心列的属性,也不需要将数据置于只读模式以确保一致性.

    • 使用专用的批量传输工具(例如Apache Sqoop),然后再重塑数据.


* 不要使用伪列- Spark JDBC中的伪列.

I am trying to move data from a table in PostgreSQL table to a Hive table on HDFS. To do that, I came up with the following code:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

The data is inserted into the hive table dynamically partitioned based on prtn_String_columns: source_system_name, period_year, period_num

Spark-submit used:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

The following error messages are generated in the executor logs:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

I see in the logs that the read is being executed properly with the given number of partitions as below:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Below is the state of executors in stages:

The data is not being partitioned properly. One partition is smaller while the other one becomes huge. There is a skew problem here. While inserting the data into Hive table the job fails at the line:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") but I understand this is happening because of the data skew problem.

I tried to increase number of executors, increasing the executor memory, driver memory, tried to just save as csv file instead of saving the dataframe into a Hive table but nothing affects the execution from giving the exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded

Is there anything in the code that I need to correct ? Could anyone let me know how can I fix this problem ?

解决方案

  1. Determine how many partitions you need given the amount of input data and your cluster resources. As a rule of thumb it is better to keep partition input under 1GB unless strictly necessary. and strictly smaller than the block size limit.

    You've previously stated that you migrate 1TB of data values you use in different posts (5 - 70) are likely way to low to ensure smooth process.

    Try to use value which won't require further repartitioning.

  2. Know your data.

    Analyze the columns available in the the dataset to determine if there any columns with high cardinality and uniform distribution to be distributed among desired number of partitions. These are good candidates for an import process. Additionally you should determine an exact range of values.

    Aggregations with different centrality and skewness measure as well as histograms and basic counts-by-key are good exploration tools. For this part it is better to analyze data directly in the database, instead of fetching it to Spark.

    Depending on the RDBMS you might be able to use width_bucket (PostgreSQL, Oracle) or equivalent function to get a decent idea how data will be distributed in Spark after loading with partitionColumn, lowerBound, upperBound, numPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
    

  3. If there are no columns which satisfy above criteria consider:

    • Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (DBMS_CRYPTO in Oracle, pgcrypto in PostgreSQL)*.
    • Using a set of independent columns which taken together provide high enough cardinality.

      Optionally, if you're going to write to a partitioned Hive table, you should consider including Hive partitioning columns. It might limit the number of files generated later.

  4. Prepare partitioning arguments

    • If column selected or created in the previous steps is numeric (or date / timestamp in Spark >= 2.4) provide it directly as the partitionColumn and use range values determined before to fill lowerBound and upperBound.

      If bound values don't reflect the properties of data (min(col) for lowerBound, max(col) for upperBound) it can result in a significant data skew so thread carefully. In the worst case scenario, when bounds don't cover the range of data, all records will be fetched by a single machine, making it no better than no partitioning at all.

    • If column selected in the previous steps is categorical or is a set of columns generate a list of mutually exclusive predicates that fully cover the data, in a form that can be used in a SQL where clause.

      For example if you have a column A with values {a1, a2, a3} and column B with values {b1, b2, b3}:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"
      

      Double check that conditions don't overlap and all combinations are covered. If these conditions are not satisfied you end up with duplicates or missing records respectively.

      Pass data as predicates argument to jdbc call. Note that the number of partitions will be equal exactly to the number of predicates.

  5. Put database in a read-only mode (any ongoing writes can cause data inconsistency. If possible you should lock database before you start the whole process, but if might be not possible, in your organization).

  6. If the number of partitions matches the desired output load data without repartition and dump directly to the sink, if not you can try to repartition following the same rules as in the step 1.

  7. If you still experience any problems make sure that you've properly configured Spark memory and GC options.

  8. If none of the above works:

    • Consider dumping your data to a network / distributes storage using tools like COPY TO and read it directly from there.

      Note that or standard database utilities you will typically need a POSIX compliant file system, so HDFS usually won't do.

      The advantage of this approach is that you don't need to worry about the column properties, and there is no need for putting data in a read-only mode, to ensure consistency.

    • Using dedicated bulk transfer tools, like Apache Sqoop, and reshaping data afterwards.


* Don't use pseudocolumns - Pseudocolumn in Spark JDBC.

这篇关于从JDBC源迁移数据时如何优化分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆