了解火花的物理计划 [英] Understanding spark physical plan

查看:78
本文介绍了了解火花的物理计划的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解Spark上的物理计划,但我不了解某些部分,因为它们看起来与传统的rdbms不同.例如,在下面的此计划中,它是有关在配置单元表上进行查询的计划.查询是这样的:

I'm trying to understand physical plans on spark but I'm not understanding some parts because they seem different from traditional rdbms. For example, in this plan below, it's a plan about a query over a hive table. The query is this:

select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus;


== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
            +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
               +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
                  +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
                     +- Filter (l_shipdate#37 <= 1998-09-16)
                        +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None

我在计划中了解的是:

  1. 首先从Hive表扫描开始

  1. First starts with a Hive table scan

然后使用条件过滤

然后项目获取所需的列

然后是钨骨料?

然后是TungstenExchange吗?

Then TungstenExchange?

然后再次钨丝聚集吗?

然后使用ConvertToSafe吗?

Then ConvertToSafe?

然后对最终结果进行排序

Then sorts the final result

但是我不理解4、5、6和7的步骤.你知道他们是什么吗?我正在寻找有关此方面的信息,以便可以理解该计划,但找不到任何具体的内容.

But I'm not understanding the 4, 5, 6 and 7 steps. Do you know what they are? I'm looking for information about this so I can understand the plan but I'm not finding anything concrete.

推荐答案

让我们看看您使用的SQL查询的结构:

Lets look at the structure of the SQL query you use:

SELECT
    ...  -- not aggregated columns  #1
    ...  -- aggregated columns      #2
FROM
    ...                          -- #3
WHERE
    ...                          -- #4
GROUP BY
    ...                          -- #5
ORDER BY
    ...                          -- #6

您已经怀疑:

  • Filter (...)对应于WHERE子句(#4)
  • 中的谓词
  • Project ...将列数限制为(#1#2以及#4/#6(如果SELECT中不存在的话))的并集
  • HiveTableScan对应于FROM子句(#3)
  • Filter (...) corresponds to predicates in WHERE clause (#4)
  • Project ... limits number of columns to those required by an union of (#1 and #2, and #4 / #6 if not present in SELECT)
  • HiveTableScan corresponds to FROM clause (#3)

其余部分的属性如下:

    SELECT子句中的
  • #2-TungstenAggregates
  • 中的functions字段
  • GROUP BY子句(#5):

  • #2 from SELECT clause - functions field in TungstenAggregates
  • GROUP BY clause (#5):

  • TungstenExchange/哈希分区
  • TungstenAggregates 中的
  • key字段
  • TungstenExchange / hash partitioning
  • key field in TungstenAggregates

#6-ORDER BY子句.

钨项目通常描述了Spark DataFrames(-sets)使用的一组优化,包括:

Project Tungsten in general describes a set of optimizations used by Spark DataFrames (-sets) including:

  • 使用sun.misc.Unsafe显式进行内存管理.这意味着本机"(堆外)内存使用情况以及显式的内存分配/外部GC管理的释放.这些转换对应于执行计划中的ConvertToUnsafe/ConvertToSafe步骤.您可以从了解sun.misc.Unsafe
  • 了解一些有关不安全的有趣信息.
  • 代码生成-设计用于生成在编译过程中更好地优化的代码的不同元编程技巧.您可以将其视为内部Spark编译器,它执行诸如将漂亮的功能代码重写为丑陋的for循环之类的事情.
  • explicit memory management with sun.misc.Unsafe. It means "native" (off-heap) memory usage and explicit memory allocation / freeing outside GC management. These conversions correspond to ConvertToUnsafe / ConvertToSafe steps in the execution plan. You can learn some interesting details about unsafe from Understanding sun.misc.Unsafe
  • code generation - different meta-programming tricks designed to generate code that better optimized during compilation. You can think of it as an internal Spark compiler which does things like rewriting nice functional code into ugly for loops.

您可以从 Apache Spark 2.0:更快,更轻松,更智能提供了一些代码生成示例.

You can learn more about Tungsten in general from Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter provides some examples of code generation.

TungstenAggregate发生两次,因为数据首先在每个分区上本地聚集,然后重新整理,最后合并.如果您熟悉RDD API,则此过程大致等同于reduceByKey.

TungstenAggregate occurs twice because data is first aggregated locally on each partition, than shuffled, and finally merged. If you are familiar with RDD API this process is roughly equivalent to reduceByKey.

如果执行计划不清楚,您也可以尝试将生成的DataFrame转换为RDD并分析toDebugString的输出.

If execution plan is not clear you can also try to convert resulting DataFrame to RDD and analyze output of toDebugString.

这篇关于了解火花的物理计划的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆