Spark正在交换已经正确分配的分区 [英] Spark doing exchange of partitions already correctly distributed

查看:143
本文介绍了Spark正在交换已经正确分配的分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我按两列加入2个数据集,结果是包含550亿行的数据集。之后,我必须在该DS上按与联接中使用的列不同的列进行一些聚合。
问题是,尽管聚集列是唯一的,但数据已经正确分配,但Spark在加入后进行交换分区(花费太多时间处理550亿行)。我知道聚合密钥已正确分发,是否有办法告知Spark应用程序?

解决方案

1)转到Spark UI并检查位置级别




I am joining 2 datasets by two columns and result is dataset containing 55 billion rows. After that I have to do some aggregation on this DS by different column than the ones used in join. Problem is that Spark is doing exchange partition after join(taking too much time with 55 billion rows) although data is already correctly distributed because aggregate column is unique. I know that aggregation key is correctly distributed and is there a way telling this to Spark app?

解决方案

1) Go to Spark UI and check "Locality Level"

2) If Joining a large and a small data use brodcast Join

3) If Joining a large and a medium size data and If the medium size RDD does not fit fully into memory use filter

val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)

4) Check is data serilize or not

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryoserializer.buffer.max", "128m")
      .set("spark.kryoserializer.buffer", "64m")
      .registerKryoClasses(
        Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])

5) Check on Spark UI or add following line in code for debugging

df.rdd.getNumPartitions

Spark's application UI, you can see from the following screenshot that the "Total Tasks" represents the number of partitions

这篇关于Spark正在交换已经正确分配的分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆