Apache Spark加入了动态重新分区 [英] Apache spark join with dynamic re-partitionion

查看:410
本文介绍了Apache Spark加入了动态重新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在两个表上进行相当简单的联接,没有什么复杂的. 加载两个表,进行联接和更新列,但它总是引发异常.

I'm trying to do a fairly straightforward join on two tables, nothing complicated. Load both tables, do a join and update columns but it keeps throwing an exception.

我注意到任务卡在了最后一个分区199/200上,最终崩溃了. 我的怀疑是数据歪斜,导致所有数据都加载到了最后一个分区199.

I noticed the task is stuck on the last partition 199/200 and eventually crashes. My suspicion is that the data is skewed causing all the data to be loaded in the last partition 199.

SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 1.5million.

SELECT COUNT(*) FROM ReportDs = 57million.

CPU:40核

内存:160G

这是我的示例代码:

...
def main(args: Array[String]) {

  val log = LogManager.getRootLogger
  log.setLevel(Level.INFO)

  val conf = new SparkConf().setAppName("ExampleJob")
                          //.setMaster("local[*]")
                          //.set("spark.sql.shuffle.partitions", "3000")
                          //.set("spark.sql.crossJoin.enabled", "true")
                          .set("spark.storage.memoryFraction", "0.02")
                          .set("spark.shuffle.memoryFraction", "0.8")
                          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                          .set("spark.default.parallelism", (CPU * 3).toString)


  val sparkSession = SparkSession.builder()
                                 .config(conf)
                                 .getOrCreate()


  val reportOpts = Map(
              "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
              "driver"  -> "org.postgresql.Driver",
              "dbtable" -> "REPORT_TBL",
              "user"    -> DB_USER,
              "password"-> DB_PASSWORD,
              "partitionColumn" -> RPT_NUM_PARTITION,
              "lowerBound" -> RPT_LOWER_BOUND,
              "upperBound" -> RPT_UPPER_BOUND,
              "numPartitions" -> "200"
            )


  val accountOpts = Map(
                "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
                "driver"  -> "org.postgresql.Driver",
                "dbtable" -> ACCOUNT_TBL,
                "user"    -> DB_USER,
                "password"-> DB_PASSWORD,
                "partitionColumn" -> ACCT_NUM_PARTITION,
                "lowerBound" -> ACCT_LOWER_BOUND,
                "upperBound" -> ACCT_UPPER_BOUND,
                "numPartitions" -> "200"
              )

  val sc = sparkSession.sparkContext;

  import sparkSession.implicits._

  val reportDs = sparkSession.read.format("jdbc").options(reportOpts).load.cache().alias("a")

  val accountDs = sparkSession.read.format("jdbc").options(accountOpts).load.cache().alias("c")

  val reportData =  reportDs.join(accountDs, reportDs("report_audit") === accountDs("reference_id"))
                                        .withColumn("report_name", when($"report_id" === "xxxx-xxx-asd", $"report_id_ref_1")
                                                                   .when($"report_id" === "demoasd-asdad-asda", $"report_id_ref_2")
                                                                   .otherwise($"report_id_ref_1" + ":" + $"report_id_ref_2"))
                                        .withColumn("report_version", when($"report_id" === "xxxx-xxx-asd", $"report_version_1")
                                                                       .when($"report_id" === "demoasd-asdad-asda", $"report_version_2")
                                                                       .otherwise($"report_version_3"))
                                        .withColumn("status", when($"report_id" === "xxxx-xxx-asd", $"report_status")
                                                                .when($"report_id" === "demoasd-asdad-asda", $"report_status_1")
                                                                .otherwise($"report_id"))
                                        .select("...")






  val prop = new Properties()
  prop.setProperty("user", DB_USER)
  prop.setProperty("password", DB_PASSWORD)
  prop.setProperty("driver", "org.postgresql.Driver")


  reportData.write
                  .mode(SaveMode.Append)
                  .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", "cust_report_data", prop)


  sparkSession.stop()

我认为应该有一种优雅的方式来处理这种数据偏度.

I think there should be an elegant way to handle this sort of data skewness.

请问

推荐答案

如果partitionColumnupperBoundlowerBound的值设置不正确,则可能会导致这种确切的行为.例如,如果lowerBound == upperBound,则所有数据都将被加载到单个分区中,而与numPartitions无关.

Your values for partitionColumn, upperBound, and lowerBound could cause this exact behavior if they aren't set correctly. For instance, if lowerBound == upperBound, then all of the data would be loaded into a single partition, regardless of numPartitions.

这些属性的组合确定从SQL数据库将哪些(或多少)条记录加载到您的DataFrame分区中.

The combination of these attributes determines which (or how many) records get loaded into your DataFrame partitions from your SQL database.

这篇关于Apache Spark加入了动态重新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆