Apache spark加入动态重新分区 [英] Apache spark join with dynamic re-partitionion

查看:19
本文介绍了Apache spark加入动态重新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对两个表进行相当简单的连接,并不复杂.加载两个表,进行连接并更新列,但它不断抛出异常.

I'm trying to do a fairly straightforward join on two tables, nothing complicated. Load both tables, do a join and update columns but it keeps throwing an exception.

我注意到任务卡在最后一个分区 199/200 并最终崩溃.我怀疑是数据倾斜导致所有数据都加载到最后一个分区199.

I noticed the task is stuck on the last partition 199/200 and eventually crashes. My suspicion is that the data is skewed causing all the data to be loaded in the last partition 199.

SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 150 万.

虽然

SELECT COUNT(*) FROM ReportDs = 5700 万.

集群详情:CPU:40 核,内存:160G.

Cluster details: CPU: 40 cores, Memory: 160G.

这是我的示例代码:

...
def main(args: Array[String]) {

  val log = LogManager.getRootLogger
  log.setLevel(Level.INFO)

  val conf = new SparkConf().setAppName("ExampleJob")
                          //.setMaster("local[*]")
                          //.set("spark.sql.shuffle.partitions", "3000")
                          //.set("spark.sql.crossJoin.enabled", "true")
                          .set("spark.storage.memoryFraction", "0.02")
                          .set("spark.shuffle.memoryFraction", "0.8")
                          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                          .set("spark.default.parallelism", (CPU * 3).toString)


  val sparkSession = SparkSession.builder()
                                 .config(conf)
                                 .getOrCreate()


  val reportOpts = Map(
              "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
              "driver"  -> "org.postgresql.Driver",
              "dbtable" -> "REPORT_TBL",
              "user"    -> DB_USER,
              "password"-> DB_PASSWORD,
              "partitionColumn" -> RPT_NUM_PARTITION,
              "lowerBound" -> RPT_LOWER_BOUND,
              "upperBound" -> RPT_UPPER_BOUND,
              "numPartitions" -> "200"
            )


  val accountOpts = Map(
                "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
                "driver"  -> "org.postgresql.Driver",
                "dbtable" -> ACCOUNT_TBL,
                "user"    -> DB_USER,
                "password"-> DB_PASSWORD,
                "partitionColumn" -> ACCT_NUM_PARTITION,
                "lowerBound" -> ACCT_LOWER_BOUND,
                "upperBound" -> ACCT_UPPER_BOUND,
                "numPartitions" -> "200"
              )

  val sc = sparkSession.sparkContext;

  import sparkSession.implicits._

  val reportDs = sparkSession.read.format("jdbc").options(reportOpts).load.cache().alias("a")

  val accountDs = sparkSession.read.format("jdbc").options(accountOpts).load.cache().alias("c")

  val reportData =  reportDs.join(accountDs, reportDs("report_audit") === accountDs("reference_id"))
                                        .withColumn("report_name", when($"report_id" === "xxxx-xxx-asd", $"report_id_ref_1")
                                                                   .when($"report_id" === "demoasd-asdad-asda", $"report_id_ref_2")
                                                                   .otherwise($"report_id_ref_1" + ":" + $"report_id_ref_2"))
                                        .withColumn("report_version", when($"report_id" === "xxxx-xxx-asd", $"report_version_1")
                                                                       .when($"report_id" === "demoasd-asdad-asda", $"report_version_2")
                                                                       .otherwise($"report_version_3"))
                                        .withColumn("status", when($"report_id" === "xxxx-xxx-asd", $"report_status")
                                                                .when($"report_id" === "demoasd-asdad-asda", $"report_status_1")
                                                                .otherwise($"report_id"))
                                        .select("...")






  val prop = new Properties()
  prop.setProperty("user", DB_USER)
  prop.setProperty("password", DB_PASSWORD)
  prop.setProperty("driver", "org.postgresql.Driver")


  reportData.write
                  .mode(SaveMode.Append)
                  .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", "cust_report_data", prop)


  sparkSession.stop()

我认为应该有一种优雅的方法来处理这种数据倾斜.

I think there should be an elegant way to handle this sort of data skewness.

推荐答案

partitionColumnupperBoundlowerBound 的值可能会导致此问题如果它们设置不正确,则确切的行为.例如,如果 lowerBound == upperBound,则所有数据都将加载到单个分区中,而不管 numPartitions.

Your values for partitionColumn, upperBound, and lowerBound could cause this exact behavior if they aren't set correctly. For instance, if lowerBound == upperBound, then all of the data would be loaded into a single partition, regardless of numPartitions.

这些属性的组合决定了哪些(或多少)记录从 SQL 数据库加载到您的 DataFrame 分区中.

The combination of these attributes determines which (or how many) records get loaded into your DataFrame partitions from your SQL database.

这篇关于Apache spark加入动态重新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆