Spark连接呈指数级缓慢 [英] Spark join exponentially slow

查看:60
本文介绍了Spark连接呈指数级缓慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试加入两个Spark RDD.我有一个链接到类别的事务日志.我已将事务RDD格式化为具有类别ID作为键.

I am trying to make a join on two Spark RDDs. I have a transaction log which is linked to categories. I have formatted my transaction RDD to have a category id as the key.

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

事务日志约为20 GB(3.5亿行).类别列表小于1KB.

The transaction log is about 20 GB (350 millions of lines). The category list is less than 1KB.

我跑步时

transactions_cat.join(categories).count()

火花开始非常缓慢.我有一个阶段,有643个任务.前10个任务耗时约1分钟.然后,每个任务越来越慢(在第60个任务周围大约15分钟).我不知道怎么了.

Spark begins to be very slow. I have a stage that have 643 tasks. The first 10 tasks took about 1 min. Each tasks is then slower and slower (approximately 15 min around the 60th task). I am not sure what's wrong.

请查看这些屏幕截图,以获得更好的主意.

Please check theses screenshots to get a better idea.

我正在使用python shell和4名工作人员运行Spark 1.1.0,总内存为50 GB.仅计算RDD交易非常快(30分钟)

I am running Spark 1.1.0 with 4 workers using the python shell for a total Memory of 50 GB. Counting the transactions RDD only is quite fast (30min)

推荐答案

出现问题的原因可能是Spark没有注意到您很容易遇到联接问题.当您要加入的两个 RDD 中的一个很小时,最好不要成为 RDD .然后,您可以滚动自己的哈希联接的实现,这实际上比听起来简单得多.基本上,您需要:

What's wrong is probably that Spark isn't noticing that you have an easy case of the join problem. When one of the two RDDs you're joining is so small you're better off with it not being an RDD. Then you can roll your own implementation of hash join, which is actually a lot simpler than it sounds. Basically, you need to:

  • 使用 collect() RDD 中拉出您的类别列表-生成的通信将很容易收回成本(或者,如果可能的话,不要这样做)首先是 RDD )
  • 将其转换为具有一个包含一个键的所有值的条目的哈希表(假设您的键不是唯一的)
  • 对于大型 RDD 中的每一对,在哈希表中向上查找键,并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)
  • Pull your category list out of the RDD using collect() -- the resulting communication will easily pay for itself (or, if possible, don't make it an RDD in the first place)
  • Turn it into a hash table with one entry containing all the values for one key (assuming your keys are not unique)
  • For each pair in your large RDD, look the key up in the hash table and produce one pair for each value in the list (if not found then that particular pair doesn't produce any results)

我有一个在Scala中实现 -感觉可以自由地提出有关翻译的问题,但这应该很容易.

I have an implementation in Scala -- feel free to ask questions about translating it, but it should be pretty easy.

另一种有趣的可能性是尝试使用 Spark SQL .我很确定该项目的长期目标将包括自动为您完成此任务,但是我不知道他们是否已经实现了目标.

Another interesting possibility is to try using Spark SQL. I'm pretty sure the project's long term ambitions would include doing this for you automatically, but I don't know if they've achieved it yet.

这篇关于Spark连接呈指数级缓慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆