Spark SQL查询导致巨大的数据洗牌读取/写入 [英] Spark sql query causing huge data shuffle read / write

查看:88
本文介绍了Spark SQL查询导致巨大的数据洗牌读取/写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark sql处理数据.这是查询

I am using spark sql for processing the data. Here is the query

select 
    /*+ BROADCAST (C) */ A.party_id, 
    IF(B.master_id is NOT NULL, B.master_id, 'MISSING_LINK') as master_id, 
    B.is_matched, 
    D.partner_name, 
    A.partner_id, 
    A.event_time_utc, 
    A.funnel_stage_type, 
    A.product_id_set, 
    A.ip_address, 
    A.session_id, 
    A.tdm_retailer_id, 
    C.product_name ,
    CASE WHEN C.product_category_lvl_01 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_01 END as product_category_lvl_01,
    CASE WHEN C.product_category_lvl_02 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_02 END as product_category_lvl_02,
    CASE WHEN C.product_category_lvl_03 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_03 END as product_category_lvl_03,
    CASE WHEN C.product_category_lvl_04 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_04 END as product_category_lvl_04, 
    C.brand_name 
from 
    browser_data A 
    INNER JOIN (select partner_name, partner_alias_tdm_id as npa_retailer_id from npa_retailer) D  
        ON (A.tdm_retailer_id = D.npa_retailer_id) 
    LEFT JOIN 
        (identity as B1 INNER JOIN (select random_val from random_distribution) B2) as B 
        ON (A.party_id = B.party_id and A.random_val = B.random_val) 
    LEFT JOIN product_taxonomy as C 
        ON (A.product_id = C.product_id and D.npa_retailer_id = C.retailer_id)

在哪里,browser_data A-大约110 GB的数据,有5.19亿条记录,

Where, browser_data A - Its around 110 GB data with 519 million records,

D-小型数据集,仅映射到A中的一个值.由于这是小型Spark sql,因此会自动广播(在解释执行计划中确认)

D - Small dataset which maps to only one value in A. As this is small spark sql automatically broadcast it (confirmed in the execution plan in explain)

B-具有4千5百万条记录的5 GB仅包含3列.该数据集被复制了30次(使用笛卡尔积,该数据集包含0到29个值),从而解决了倾斜键(数据集A中的数据相对于一个)的问题,这将产生150 GB的数据.

B - 5 GB with 45 million records contains only 3 columns. This dataset is replicated 30 times (done with cartesian product with dataset which contains 0 to 29 values) so that skewed key (lot of data against one in dataset A) issue is solved.This will result in 150 GB of data.

C-900 MB,具有900万条记录.这与A通过广播连接(因此没有随机播放)连接在一起

C - 900 MB with 9 million records. This is joined with A with broadcast join (so no shuffle)

以上查询效果很好.但是,当我看到Spark UI时,我可以观察到上面的查询触发了6.8 TB的随机读取.由于数据集D和C以广播的形式连接在一起,因此不会引起任何混乱.因此,只有A和B的连接才可导致洗牌.即使我们考虑将所有数据混排读取,也应将其限制为110 GB(A)+ 150 GB(B)= 260 GB.为什么它会触发6.8 TB的随机数据读取和40 GB的随机数据写入.任何帮助表示赞赏.预先谢谢你

Above query works well. But when I see spark UI I can observe above query triggers shuffle read of 6.8 TB. As dataset D and C are joined as broadcast it wont cause any shuffle. So only join of A and B should cause the shuffle. Even if we consider all data shuffled read then it should be limited to 110 GB (A) + 150 GB (B) = 260 GB. Why it is triggering 6.8 TB of shuffle read and 40 GB of shuffle write. Any help appreciated. Thank you in advance

谢谢

Manish

推荐答案

我要做的第一件事是在其上使用 DataFrame.explain .这将向您显示执行计划,以便您可以确切地看到实际发生的情况.我将检查输出以确认广播联接"确实正在发生.Spark的设置可以控制您的数据在放弃广播连接之前可以达到的大小.

The first thing I would do is use DataFrame.explain on it. That will show you the execution plan so you can see exactly what is actually happen. I would check the output to confirm that the Broadcast Join is really happening. Spark has a setting to control how big your data can be before it gives up on doing a broadcast join.

我还要指出,您对random_distribution的INNER JOIN看起来很可疑.我可能是错误地重新创建了您的架构,但是当我做了解释时,我明白了:

I would also note that your INNER JOIN against the random_distribution looks suspect. I may have recreated your schema wrong, but when I did explain I got this:

scala> spark.sql(sql).explain
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
LocalRelation [party_id#99]
and
LocalRelation [random_val#117]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

最后,您的输入数据是否被压缩?您可能会看到大小上的差异,这是因为不再有数据不再被压缩的组合,以及由于数据的序列化方式所致.

Finally, is your input data compressed? You may be seeing the size differences because of a combination of no your data no longer being compressed, and because of the way it is being serialized.

这篇关于Spark SQL查询导致巨大的数据洗牌读取/写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆