如何处理Spark数据帧中的数据偏斜以进行外部联接 [英] How to handle data skew in the spark data frame for outer join

查看:73
本文介绍了如何处理Spark数据帧中的数据偏斜以进行外部联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框,并且正在5列上执行外部联接. 以下是我的数据集示例.

I have two data frames and I am performing outer join on 5 columns . Below is example of my data set .

uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|498|^|Shareholders' Equity Per Share|^|505074|^|91.37000|^|678.74654|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|474880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235004981302988315|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|500|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|90000000.00000|^|12115420.96161|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|499712|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005001178855709|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|562|^|Number of Employees|^|505074|^|2924.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|464864|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005621461877526|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|655|^|Total number of shareholders|^|505074|^|11792.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|466927|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235006551335570418|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|657|^|Total dividends paid (common stock)|^|505074|^|540000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|233463|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|12350065712483219|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1452|^|Order received|^|505074|^|26936000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014521608462544|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1453|^|Order backlogs|^|505074|^|1447000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014531922884465|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1457|^|Export amount|^|505074|^|3924000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|291829|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014571728332413|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1459|^|Capital expenditures (Note)|^|505074|^|659000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014591148256870|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|255|^|Number of Employees|^|505074|^|10152.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|12288|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002551128894330|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|256|^|Average Age of Employees|^|505074|^|34.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002561111316467|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|542|^|Shareholders' Equity Per Share|^|505074|^|160.20000|^|691.93184|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|471038|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005421170597389|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|545|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|679468000.00000|^|157314300.64243|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|472064|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005451445165969|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|718|^|Total dividends paid (common stock)|^|505074|^|4750000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|458752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236007181118043352|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1364|^|Export amount|^|505074|^|15379000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|459752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236013641649895533|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1407|^|Total number of shareholders|^|505074|^|57288.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|460752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236014071623011361|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|

第二个数据集的结构也相同

The structure of the second data set is also same

我正在前5列上表演. 如您所见,所有前5列的组合不能为我提供足够的分区,从而导致数据偏斜.

I am performing on first 5 columns . As you can see the combination of all first 5 columns does not provide me enough partition and that leads to data skew .

火花作业卡在某些执行器上.

The spark job stuck on some of the Executor .

第一个数据集的大小为270 GB,第二个数据集的大小为5 GB,但预计会增加.

The size of the first dataset is 270 GB and second is 5 GB but expected to increase .

分区1128的总数

Total no of partition 1128

这就是我执行加入的方式

This is how I perform my join

val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))

我尝试实施Broadcast Join,但没有影响.

I tried implementing Broadcast Join but no impact .

因此,在这种情况下,我可以对连接密钥使用加盐或散列,以便使连接密钥变得随机并且不会发生偏斜.

So in this case can I use salting or hashing on join key so that the joining key will become random and skew will not occur I guess .

这是我的查询和应用详细信息

Here is my query and app details

这是我们加载数据时的集群详细信息.

Here is the cluster details when we are loading the data .

这是大多数容器空闲时的群集详细信息.

And here is cluster details when most of the container is idle.

添加任务的详细信息(某些任务为10,而某些执行程序为3至4).

Adding the details of the task where some are 10 and on some executor only 3 to 4 .

推荐答案

请考虑以下几点:

1)由于您有60个执行程序,每个执行程序有10个核心,因此您的分区至少应为60 x 10 = 600个分区

1) Since you have 60 executors and 10 cores per executor your partitions should be at least 60 x 10 = 600 partitions

2)在您的情况下,您有270GB/1128〜241MB,这应该大约是对我来说看起来很大的分区大小(考虑改组期间的数据交换).首先尝试重新分区为更实际的内容,例如8K甚至16K.

2) In your case you have 270GB / 1128 ~ 241MB this should approximately be the partition size which looks quite big to me (considering data exchange during shuffling). Try first to re-partition to something more realistic for instance 8K or even 16K.

3)由于我无法清楚地看到有多少执行者参与工作执行,因此您需要再次检查并找出参与执行者的确切人数以及数据是否平均分配.如果执行程序之间的数据偏差很小,那么您的数据将分布良好,否则您将面临偏差.

3) Since I can not see clearly how many executors participate on job execution you need to check it again and figure out the exact number of participating executors and if data is equally distributed. If data deviation between executors is low then your data is well distributed otherwise you face skewing.

4)如果在重新分区倾斜后仍然坚持尝试按照

4) If after re-partition skewing insists try to redistribute the join keys as described here

这篇关于如何处理Spark数据帧中的数据偏斜以进行外部联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆