从 JDBC 源迁移数据时如何优化分区? [英] How to optimize partitioning when migrating data from JDBC source?

查看:27
本文介绍了从 JDBC 源迁移数据时如何优化分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将数据从 PostgreSQL 表中的表移动到 HDFS 上的 Hive 表.为此,我想出了以下代码:

 val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {val colList = allColumns.split(",").toListval (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017").option("user", devUserName).option("password", devPassword).option("partitionColumn","cast_id").option("lowerBound", 1).option("upperBound", 100000).option("numPartitions",70).load()val totalCols:List[String] = splitColumns ++ textListval cdt = new ChangeDataTypes(totalCols, dataMapper)hiveDataTypes = cdt.gpDetails()val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columnsval allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))val resultDF = yearDF.select(allCols:_*)val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)val finalDF = stringColumns.foldLeft(resultDF) {(tempDF, colName) =>tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))}最终DF}val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)val dataDFPart = dataDF.repartition(30)dataDFPart.createOrReplaceTempView("preparedDF")spark.sql("设置 hive.exec.dynamic.partition.mode=nonstrict")spark.sql("设置 hive.exec.dynamic.partition=true")spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from PreparedDF")

将数据插入到基于prtn_String_columns:source_system_name, period_year, period_num

动态分区的hive表中

Spark-submit 使用:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path/home/fdlhdpetl/jars/postgresql-42.1.4.jar --jars/home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartitionsplinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab/home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files/usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

执行程序日志中生成以下错误消息:

容器以非零退出代码 143 退出.被外部信号杀死18/10/03 15:37:24 错误 SparkUncaughtExceptionHandler:线程 Thread[SIGTERM handler,9,system] 中未捕获的异常java.lang.OutOfMemoryError:Java 堆空间在 java.util.zip.InflaterInputStream.(InflaterInputStream.java:88)在 java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)在 java.util.zip.ZipFile.getInputStream(ZipFile.java:374)在 java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)在 java.util.jar.JarFile.getManifest(JarFile.java:180)在 sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)在 java.net.URLClassLoader.defineClass(URLClassLoader.java:450)在 java.net.URLClassLoader.access$100(URLClassLoader.java:73)在 java.net.URLClassLoader$1.run(URLClassLoader.java:368)在 java.net.URLClassLoader$1.run(URLClassLoader.java:362)在 java.security.AccessController.doPrivileged(Native Method)在 java.net.URLClassLoader.findClass(URLClassLoader.java:361)在 java.lang.ClassLoader.loadClass(ClassLoader.java:424)在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)在 java.lang.ClassLoader.loadClass(ClassLoader.java:357)在 org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)在 sun.misc.Signal$1.run(Signal.java:212)在 java.lang.Thread.run(Thread.java:745)

我在日志中看到正在使用给定数量的分区正确执行读取,如下所示:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是分阶段执行者的状态:

数据未正确分区.一个分区变小,而另一个分区变大.这里存在偏斜问题.将数据插入 Hive 表时,作业在以下行失败:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from readyDF") 但我明白这一点由于数据倾斜问题而发生.

我尝试增加执行程序的数量,增加执行程序内存,驱动程序内存,尝试仅另存为 csv 文件而不是将数据帧保存到 Hive 表中,但没有任何异常会影响执行:

java.lang.OutOfMemoryError: 超出 GC 开销限制

代码中有什么需要更正的地方吗?谁能告诉我如何解决这个问题?

解决方案

  1. 根据输入数据量和集群资源确定您需要多少个分区.根据经验,除非绝对必要,否则最好将分区输入保持在 1GB 以下.并且严格小于块大小限制.

    您已经之前声明您迁移了在不同帖子 (5 - 70) 中使用的 1TB 数据值是可能的方法是降低以确保流程顺利.

    尽量使用不需要进一步重新分区的值.

  2. 了解您的数据.

    分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中.这些是导入过程的良好候选者.此外,您应该确定一个确切的值范围.

    具有不同中心性和偏度度量的聚合以及直方图和基本键计数是很好的探索工具.对于这部分,最好直接在数据库中分析数据,而不是将其提取到 Spark.

    根据 RDBMS,您可能可以使用 width_bucket(PostgreSQL、Oracle)或等效函数来了解使用 partitionColumnpartitionColumn 加载后数据在 Spark 中的分布情况code>、lowerBoundupperBoundnumPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)从TGROUP BY 桶) 作为 tmp)"""

  3. 如果没有满足上述条件的列,请考虑:

    • 创建一个自定义的并通过它公开.一个看法.多个独立列上的哈希通常是很好的候选者.请查阅您的数据库手册以确定此处可以使用的函数(Oracle 中的DBMS_CRYPTO,PostgreSQL 中的pgcrypto)*.
    • 使用一组独立的列,它们一起提供足够高的基数.

      可选地,如果您要写入分区的 Hive 表,您应该考虑包括 Hive 分区列.它可能会限制以后生成的文件数量.

  4. 准备分区参数

    • 如果在前面的步骤中选择或创建的列是数字(或日期/Spark 中的时间戳 >= 2.4) 直接将其作为 partitionColumn 提供,并使用之前确定的范围值来填充 lowerBoundupperBound.

      如果绑定值不反映数据的属性(min(col) for lowerBound, max(col) for >upperBound) 它可能会导致显着的数据倾斜,因此请小心线程.在最坏的情况下,当边界不覆盖数据范围时,所有记录都将由一台机器获取,这与根本不分区相比没有任何好处.

    • 如果在前面的步骤中选择的列是分类的或者是一组列,则生成一个完全覆盖数据的互斥谓词列表,其形式可用于SQL where 子句.

      例如,如果您有一列 A 具有值 {a1a2a3} 和列B 带有值 {b1, b2, b3}:

      val 谓词 = for {a <- Seq("a1", "a2", "a3")b <- Seq("b1", "b2", "b3")} yield s"A = $a AND B = $b"

      仔细检查条件是否重叠并且所有组合都被覆盖.如果不满足这些条件,您最终会分别得到重复或丢失的记录.

      将数据作为 predicates 参数传递给 jdbc 调用.请注意,分区数将与谓词数完全相等.

  5. 将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致.如果可能,您应该在开始整个过程​​之前锁定数据库,但如果不可能,则在您的组织中).

  6. 如果分区数与所需的输出加载数据匹配,无需repartition 并直接转储到接收器,否则您可以尝试按照与步骤 1 中相同的规则重新分区.

  7. 如果您仍然遇到任何问题,请确保您已正确配置 Spark 内存和 GC 选项.

  8. 如果以上都不起作用:

    • 考虑将您的数据转储到网络/使用诸如 COPY TO 之类的工具分发存储并直接从那里读取.

      请注意,标准数据库实用程序通常需要符合 POSIX 的文件系统,因此 HDFS 通常不会这样做.

      这种方式的好处是不用担心列属性,也不需要将数据置于只读模式,保证一致性.

    • 使用专用的批量传输工具,如 Apache Sqoop,然后重新调整数据.

<小时>

* 不要使用伪列 - Spark JDBC 中的伪列.

I am trying to move data from a table in PostgreSQL table to a Hive table on HDFS. To do that, I came up with the following code:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

The data is inserted into the hive table dynamically partitioned based on prtn_String_columns: source_system_name, period_year, period_num

Spark-submit used:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

The following error messages are generated in the executor logs:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

I see in the logs that the read is being executed properly with the given number of partitions as below:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Below is the state of executors in stages:

The data is not being partitioned properly. One partition is smaller while the other one becomes huge. There is a skew problem here. While inserting the data into Hive table the job fails at the line:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") but I understand this is happening because of the data skew problem.

I tried to increase number of executors, increasing the executor memory, driver memory, tried to just save as csv file instead of saving the dataframe into a Hive table but nothing affects the execution from giving the exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded

Is there anything in the code that I need to correct ? Could anyone let me know how can I fix this problem ?

解决方案

  1. Determine how many partitions you need given the amount of input data and your cluster resources. As a rule of thumb it is better to keep partition input under 1GB unless strictly necessary. and strictly smaller than the block size limit.

    You've previously stated that you migrate 1TB of data values you use in different posts (5 - 70) are likely way to low to ensure smooth process.

    Try to use value which won't require further repartitioning.

  2. Know your data.

    Analyze the columns available in the the dataset to determine if there any columns with high cardinality and uniform distribution to be distributed among desired number of partitions. These are good candidates for an import process. Additionally you should determine an exact range of values.

    Aggregations with different centrality and skewness measure as well as histograms and basic counts-by-key are good exploration tools. For this part it is better to analyze data directly in the database, instead of fetching it to Spark.

    Depending on the RDBMS you might be able to use width_bucket (PostgreSQL, Oracle) or equivalent function to get a decent idea how data will be distributed in Spark after loading with partitionColumn, lowerBound, upperBound, numPartitons.

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
    

  3. If there are no columns which satisfy above criteria consider:

    • Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (DBMS_CRYPTO in Oracle, pgcrypto in PostgreSQL)*.
    • Using a set of independent columns which taken together provide high enough cardinality.

      Optionally, if you're going to write to a partitioned Hive table, you should consider including Hive partitioning columns. It might limit the number of files generated later.

  4. Prepare partitioning arguments

    • If column selected or created in the previous steps is numeric (or date / timestamp in Spark >= 2.4) provide it directly as the partitionColumn and use range values determined before to fill lowerBound and upperBound.

      If bound values don't reflect the properties of data (min(col) for lowerBound, max(col) for upperBound) it can result in a significant data skew so thread carefully. In the worst case scenario, when bounds don't cover the range of data, all records will be fetched by a single machine, making it no better than no partitioning at all.

    • If column selected in the previous steps is categorical or is a set of columns generate a list of mutually exclusive predicates that fully cover the data, in a form that can be used in a SQL where clause.

      For example if you have a column A with values {a1, a2, a3} and column B with values {b1, b2, b3}:

      val predicates = for {
        a <- Seq("a1", "a2", "a3")
        b <- Seq("b1", "b2", "b3")
      } yield s"A = $a AND B = $b"
      

      Double check that conditions don't overlap and all combinations are covered. If these conditions are not satisfied you end up with duplicates or missing records respectively.

      Pass data as predicates argument to jdbc call. Note that the number of partitions will be equal exactly to the number of predicates.

  5. Put database in a read-only mode (any ongoing writes can cause data inconsistency. If possible you should lock database before you start the whole process, but if might be not possible, in your organization).

  6. If the number of partitions matches the desired output load data without repartition and dump directly to the sink, if not you can try to repartition following the same rules as in the step 1.

  7. If you still experience any problems make sure that you've properly configured Spark memory and GC options.

  8. If none of the above works:

    • Consider dumping your data to a network / distributes storage using tools like COPY TO and read it directly from there.

      Note that or standard database utilities you will typically need a POSIX compliant file system, so HDFS usually won't do.

      The advantage of this approach is that you don't need to worry about the column properties, and there is no need for putting data in a read-only mode, to ensure consistency.

    • Using dedicated bulk transfer tools, like Apache Sqoop, and reshaping data afterwards.


* Don't use pseudocolumns - Pseudocolumn in Spark JDBC.

这篇关于从 JDBC 源迁移数据时如何优化分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆