了解火花物理计划 [英] Understanding spark physical plan
问题描述
我正在尝试了解 Spark 上的物理计划,但我不了解某些部分,因为它们似乎与传统的 rdbms 不同.例如,在下面的这个计划中,它是一个关于对 hive 表进行查询的计划.查询是这样的:
I'm trying to understand physical plans on spark but I'm not understanding some parts because they seem different from traditional rdbms. For example, in this plan below, it's a plan about a query over a hive table. The query is this:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
我对计划的理解是:
首先从 Hive 表扫描开始
First starts with a Hive table scan
然后使用 where 条件过滤
Then it filter using where the condition
然后投影得到我们想要的列
Then project to get the columns we want
然后是钨聚合?
然后是 TungstenExchange?
Then TungstenExchange?
然后再次使用 TungstenAggregate?
Then TungstenAggregate again?
然后 ConvertToSafe?
Then ConvertToSafe?
然后对最终结果进行排序
Then sorts the final result
但我不理解第 4、5、6 和 7 步.你知道它们是什么吗?我正在寻找有关这方面的信息,以便了解计划,但我没有找到任何具体的信息.
But I'm not understanding the 4, 5, 6 and 7 steps. Do you know what they are? I'm looking for information about this so I can understand the plan but I'm not finding anything concrete.
推荐答案
让我们看看你使用的 SQL 查询的结构:
Lets look at the structure of the SQL query you use:
SELECT
... -- not aggregated columns #1
... -- aggregated columns #2
FROM
... -- #3
WHERE
... -- #4
GROUP BY
... -- #5
ORDER BY
... -- #6
正如您已经怀疑的那样:
As you already suspect:
Filter (...)
对应于WHERE
子句(#4
)中的谓词Project ...
将列数限制为 (#1
和#2
和的联合所需的列数#4
/#6
如果SELECT
中不存在)HiveTableScan
对应于FROM
子句 (#3
)
Filter (...)
corresponds to predicates inWHERE
clause (#4
)Project ...
limits number of columns to those required by an union of (#1
and#2
, and#4
/#6
if not present inSELECT
)HiveTableScan
corresponds toFROM
clause (#3
)
其余部分可归结如下:
#2
来自SELECT
子句 -TungstenAggregates
中的 GROUP BY
子句(#5
):
functions
字段#2
fromSELECT
clause -functions
field inTungstenAggregates
GROUP BY
clause (#5
):
TungstenExchange
/哈希分区key
字段在TungstenAggregates
TungstenExchange
/ hash partitioningkey
field inTungstenAggregates
#6
- ORDER BY
子句.
Project Tungsten 概括地描述了 Spark DataFrames
(-sets
) 使用的一组优化,包括:
Project Tungsten in general describes a set of optimizations used by Spark DataFrames
(-sets
) including:
- 使用
sun.misc.Unsafe
进行显式内存管理.它意味着本机"(堆外)内存使用和显式内存分配/在 GC 管理之外释放.这些转换对应于执行计划中的ConvertToUnsafe
/ConvertToSafe
步骤.您可以从了解 sun.misc.Unsafe 了解一些有关不安全的有趣细节 - 代码生成 - 不同的元编程技巧,旨在生成在编译期间更好地优化的代码.您可以将其视为一个内部 Spark 编译器,它执行诸如将漂亮的功能代码重写为丑陋的 for 循环之类的工作.
- explicit memory management with
sun.misc.Unsafe
. It means "native" (off-heap) memory usage and explicit memory allocation / freeing outside GC management. These conversions correspond toConvertToUnsafe
/ConvertToSafe
steps in the execution plan. You can learn some interesting details about unsafe from Understanding sun.misc.Unsafe - code generation - different meta-programming tricks designed to generate code that better optimized during compilation. You can think of it as an internal Spark compiler which does things like rewriting nice functional code into ugly for loops.
您可以从 Project Tungsten:让 Apache Spark 更接近裸机.Apache Spark 2.0:更快、更简单、更智能 提供了一些代码生成示例.
You can learn more about Tungsten in general from Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter provides some examples of code generation.
TungstenAggregate
出现两次,因为数据首先在每个分区上本地聚合,然后进行混洗,最后合并.如果你熟悉 RDD API,这个过程大致相当于 reduceByKey
.
TungstenAggregate
occurs twice because data is first aggregated locally on each partition, than shuffled, and finally merged. If you are familiar with RDD API this process is roughly equivalent to reduceByKey
.
如果执行计划不清楚,您也可以尝试将结果DataFrame
转换为RDD
并分析toDebugString
的输出.
If execution plan is not clear you can also try to convert resulting DataFrame
to RDD
and analyze output of toDebugString
.
这篇关于了解火花物理计划的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!