了解火花物理计划 [英] Understanding spark physical plan

查看:26
本文介绍了了解火花物理计划的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试了解 Spark 上的物理计划,但我不了解某些部分,因为它们似乎与传统的 rdbms 不同.例如,在下面的这个计划中,它是一个关于对 hive 表进行查询的计划.查询是这样的:

I'm trying to understand physical plans on spark but I'm not understanding some parts because they seem different from traditional rdbms. For example, in this plan below, it's a plan about a query over a hive table. The query is this:

select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus;


== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
            +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
               +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
                  +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
                     +- Filter (l_shipdate#37 <= 1998-09-16)
                        +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None

我对计划的理解是:

  1. 首先从 Hive 表扫描开始

  1. First starts with a Hive table scan

然后使用 where 条件过滤

Then it filter using where the condition

然后投影得到我们想要的列

Then project to get the columns we want

然后是钨聚合?

然后是 TungstenExchange?

Then TungstenExchange?

然后再次使用 TungstenAggregate?

Then TungstenAggregate again?

然后 ConvertToSafe?

Then ConvertToSafe?

然后对最终结果进行排序

Then sorts the final result

但我不理解第 4、5、6 和 7 步.你知道它们是什么吗?我正在寻找有关这方面的信息,以便了解计划,但我没有找到任何具体的信息.

But I'm not understanding the 4, 5, 6 and 7 steps. Do you know what they are? I'm looking for information about this so I can understand the plan but I'm not finding anything concrete.

推荐答案

让我们看看你使用的 SQL 查询的结构:

Lets look at the structure of the SQL query you use:

SELECT
    ...  -- not aggregated columns  #1
    ...  -- aggregated columns      #2
FROM
    ...                          -- #3
WHERE
    ...                          -- #4
GROUP BY
    ...                          -- #5
ORDER BY
    ...                          -- #6

正如您已经怀疑的那样:

As you already suspect:

  • Filter (...) 对应于WHERE 子句(#4)中的谓词
  • Project ... 将列数限制为 (#1#2 的联合所需的列数#4/#6 如果 SELECT 中不存在)
  • HiveTableScan 对应于 FROM 子句 (#3)
  • Filter (...) corresponds to predicates in WHERE clause (#4)
  • Project ... limits number of columns to those required by an union of (#1 and #2, and #4 / #6 if not present in SELECT)
  • HiveTableScan corresponds to FROM clause (#3)

其余部分可归结如下:

  • #2 来自 SELECT 子句 - TungstenAggregates
  • 中的 functions 字段
  • GROUP BY 子句(#5):

  • #2 from SELECT clause - functions field in TungstenAggregates
  • GROUP BY clause (#5):

  • TungstenExchange/哈希分区
  • key 字段在 TungstenAggregates
  • TungstenExchange / hash partitioning
  • key field in TungstenAggregates

#6 - ORDER BY 子句.

Project Tungsten 概括地描述了 Spark DataFrames (-sets) 使用的一组优化,包括:

Project Tungsten in general describes a set of optimizations used by Spark DataFrames (-sets) including:

  • 使用 sun.misc.Unsafe 进行显式内存管理.它意味着本机"(堆外)内存使用和显式内存分配/在 GC 管理之外释放.这些转换对应于执行计划中的 ConvertToUnsafe/ConvertToSafe 步骤.您可以从了解 sun.misc.Unsafe
  • 了解一些有关不安全的有趣细节
  • 代码生成 - 不同的元编程技巧,旨在生成在编译期间更好地优化的代码.您可以将其视为一个内部 Spark 编译器,它执行诸如将漂亮的功能代码重写为丑陋的 for 循环之类的工作.
  • explicit memory management with sun.misc.Unsafe. It means "native" (off-heap) memory usage and explicit memory allocation / freeing outside GC management. These conversions correspond to ConvertToUnsafe / ConvertToSafe steps in the execution plan. You can learn some interesting details about unsafe from Understanding sun.misc.Unsafe
  • code generation - different meta-programming tricks designed to generate code that better optimized during compilation. You can think of it as an internal Spark compiler which does things like rewriting nice functional code into ugly for loops.

您可以从 Project Tungsten:让 Apache Spark 更接近裸机.Apache Spark 2.0:更快、更简单、更智能 提供了一些代码生成示例.

You can learn more about Tungsten in general from Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter provides some examples of code generation.

TungstenAggregate 出现两次,因为数据首先在每个分区上本地聚合,然后进行混洗,最后合并.如果你熟悉 RDD API,这个过程大致相当于 reduceByKey.

TungstenAggregate occurs twice because data is first aggregated locally on each partition, than shuffled, and finally merged. If you are familiar with RDD API this process is roughly equivalent to reduceByKey.

如果执行计划不清楚,您也可以尝试将结果DataFrame转换为RDD并分析toDebugString的输出.

If execution plan is not clear you can also try to convert resulting DataFrame to RDD and analyze output of toDebugString.

这篇关于了解火花物理计划的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆