Spark 数据帧:加入后偏斜分区 [英] Spark Dataframes: Skewed Partition after Join

查看:102
本文介绍了Spark 数据帧:加入后偏斜分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框,df1 有 2200 万条记录,df2 有 200 万条记录.我正在将 email_address 作为键进行正确的连接.

I've two dataframes, df1 with 22 million records and df2 with 2 million records. I'm doing the right join on email_address as a key.

test_join = df2.join(df1, "email_address", how = 'right').cache()

两个数据框中的重复电子邮件(如果有)很少.加入后,我试图找到结果数据帧 test_join 的分区大小,使用以下代码:

There are very few duplicate (if any) emails in both data frames. After the join I'm trying to find the partition size of the resulting dataframe test_join, using this code:

l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))

结果表明,最大的分区大约是平均分区大小的 100 倍.分区大小的这种偏差会导致加入后转换和操作的性能问题.

The result shows that the largest partition is around 100 times bigger than the average partition size. This skew in partition size is giving performance issues in post-join transformations and actions.

我知道我可以在加入后使用 repartion(num_partitions) 命令同样地重新分区它,但我的问题是为什么我会遇到这种不均匀的分区结果,有什么办法可以避免它首先.

I know I can equally re-partition it after the join using repartion(num_partitions) command, but my question is why am I experiencing this uneven partition result, and is there any way to avoid it in the first place.

P.S:只是为了检查假设问题是否仅与 email_address 哈希函数有关,我还检查了其他几个连接的分区大小,我也在数字键连接中看到了这个问题.

P.S: Just to check the assumption if the problem is only with email_address hashing function, I also checked partition size on couple of other joins, I also saw the issue in a numeric key join as well.

推荐答案

@user6910411 你是在现场.问题出在我的数据上,遵循一些愚蠢的约定来输入空电子邮件,这导致了这个偏斜问题.

@user6910411 you were spot on. The problem was with my data, there was some dumb convention followed to enter empty emails, which was causing this skew key issue.

在检查最大分区中的实体后,我开始知道那里发生了什么.我发现这种调试技术非常有用,我相信这可以帮助面临同样问题的其他人.

Upon inspecting the enteries in the largest partition, I came to know what was going in there. I found this debugging technique quite useful, and I'm sure this could help others who are facing the same issue.

顺便说一句,这是我写的函数,用于查找 RDD 分区的偏度:

BTW, this is the function I wrote, to find the skeweness of the RDD partitions:

from itertools import islice
def check_skewness(df):
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
    max_part = max(l,key=lambda item:item[1])
    min_part = min(l,key=lambda item:item[1])
    if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
        print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
        print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
    else:
        print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

然后我只是传递我想检查偏度的数据框:

and then I just pass the dataframe for which I want to check the skewness like this:

check_skewness(test_join)

它给了我关于它的偏度的很好的信息.

and it gives me nice information about its skewness.

这篇关于Spark 数据帧:加入后偏斜分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆