Spark 交换分区已经正确分布 [英] Spark doing exchange of partitions already correctly distributed

查看:29
本文介绍了Spark 交换分区已经正确分布的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将 2 个数据集按两列连接起来,结果是包含 550 亿行的数据集.之后,我必须通过与 join 中使用的列不同的列对这个 DS 进行一些聚合.问题是 Spark 在加入后进行交换分区(550 亿行花费太多时间),尽管数据已经正确分布,因为聚合列是唯一的.我知道聚合密钥已正确分发,有没有办法将其告知 Spark 应用程序?

I am joining 2 datasets by two columns and result is dataset containing 55 billion rows. After that I have to do some aggregation on this DS by different column than the ones used in join. Problem is that Spark is doing exchange partition after join(taking too much time with 55 billion rows) although data is already correctly distributed because aggregate column is unique. I know that aggregation key is correctly distributed and is there a way telling this to Spark app?

推荐答案

1) 转到 Spark UI 并检查Locality Level"

1) Go to Spark UI and check "Locality Level"

2) 如果加入大数据和小数据使用广播加入

2) If Joining a large and a small data use brodcast Join

3) 如果加入一个大的和一个中等大小的数据并且如果中等大小的 RDD 不完全适合内存使用过滤器

3) If Joining a large and a medium size data and If the medium size RDD does not fit fully into memory use filter

val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)

4) 检查数据是否序列化

4) Check is data serilize or not

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryoserializer.buffer.max", "128m")
      .set("spark.kryoserializer.buffer", "64m")
      .registerKryoClasses(
        Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])

5) 检查 Spark UI 或在代码中添加以下行进行调试

5) Check on Spark UI or add following line in code for debugging

df.rdd.getNumPartitions

Spark 的应用 UI,从下面的截图中可以看出Total Tasks"代表分区数

Spark's application UI, you can see from the following screenshot that the "Total Tasks" represents the number of partitions

这篇关于Spark 交换分区已经正确分布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆