如何处理外连接的火花数据框中的数据倾斜 [英] How to handle data skew in the spark data frame for outer join

查看:17
本文介绍了如何处理外连接的火花数据框中的数据倾斜的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框,我正在 5 列上执行外连接.以下是我的数据集示例.

I have two data frames and I am performing outer join on 5 columns . Below is example of my data set .

uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|498|^|Shareholders' Equity Per Share|^|505074|^|91.37000|^|678.74654|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|474880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235004981302988315|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|500|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|90000000.00000|^|12115420.96161|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|499712|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005001178855709|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|562|^|Number of Employees|^|505074|^|2924.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|464864|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005621461877526|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|655|^|Total number of shareholders|^|505074|^|11792.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|466927|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235006551335570418|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|657|^|Total dividends paid (common stock)|^|505074|^|540000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|233463|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|12350065712483219|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1452|^|Order received|^|505074|^|26936000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014521608462544|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1453|^|Order backlogs|^|505074|^|1447000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014531922884465|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1457|^|Export amount|^|505074|^|3924000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|291829|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014571728332413|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1459|^|Capital expenditures (Note)|^|505074|^|659000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014591148256870|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|255|^|Number of Employees|^|505074|^|10152.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|12288|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002551128894330|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|256|^|Average Age of Employees|^|505074|^|34.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002561111316467|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|542|^|Shareholders' Equity Per Share|^|505074|^|160.20000|^|691.93184|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|471038|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005421170597389|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|545|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|679468000.00000|^|157314300.64243|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|472064|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005451445165969|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|718|^|Total dividends paid (common stock)|^|505074|^|4750000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|458752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236007181118043352|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1364|^|Export amount|^|505074|^|15379000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|459752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236013641649895533|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1407|^|Total number of shareholders|^|505074|^|57288.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|460752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236014071623011361|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|

第二个数据集的结构也一样

The structure of the second data set is also same

我在前 5 列上表演.正如您所看到的,所有前 5 列的组合没有为我提供足够的分区,这导致数据倾斜.

I am performing on first 5 columns . As you can see the combination of all first 5 columns does not provide me enough partition and that leads to data skew .

spark 作业卡在某些 Executor 上.

The spark job stuck on some of the Executor .

第一个数据集的大小为 270 GB,第二个为 5 GB,但预计会增加.

The size of the first dataset is 270 GB and second is 5 GB but expected to increase .

分区总数 1128

这就是我执行加入的方式

This is how I perform my join

val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))

我尝试实施广播加入但没有影响.

I tried implementing Broadcast Join but no impact .

所以在这种情况下,我可以在加入键上使用加盐或散列,以便加入键变得随机并且我猜不会发生偏斜.

So in this case can I use salting or hashing on join key so that the joining key will become random and skew will not occur I guess .

这是我的查询和应用详情

Here is my query and app details

这是我们加载数据时的集群详细信息.

Here is the cluster details when we are loading the data .

当大部分容器空闲时,这里是集群详细信息.

And here is cluster details when most of the container is idle.

添加任务的详细信息,有些是 10,有些执行器只有 3 到 4.

Adding the details of the task where some are 10 and on some executor only 3 to 4 .

推荐答案

请考虑以下几点:

1) 由于您有 60 个执行程序和每个执行程序 10 个内核,因此您的分区应至少为 60 x 10 = 600 个分区

1) Since you have 60 executors and 10 cores per executor your partitions should be at least 60 x 10 = 600 partitions

2) 在您的情况下,您有 270GB/1128 ~ 241MB,这应该大约是对我来说看起来相当大的分区大小(考虑改组期间的数据交换).首先尝试重新分区到更现实的东西,例如 8K 甚至 16K.

2) In your case you have 270GB / 1128 ~ 241MB this should approximately be the partition size which looks quite big to me (considering data exchange during shuffling). Try first to re-partition to something more realistic for instance 8K or even 16K.

3) 由于我无法清楚地看到有多少 executors 参与作业执行,您需要再次检查并确定参与的 executors 的确切数量以及数据是否均匀分布.如果执行者之间的数据偏差很小,那么您的数据分布良好,否则您将面临倾斜.

3) Since I can not see clearly how many executors participate on job execution you need to check it again and figure out the exact number of participating executors and if data is equally distributed. If data deviation between executors is low then your data is well distributed otherwise you face skewing.

4) 如果在重新分区后偏斜坚持尝试重新分配连接键,如此处

4) If after re-partition skewing insists try to redistribute the join keys as described here

这篇关于如何处理外连接的火花数据框中的数据倾斜的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆