为蜂房和星火窗口功能数据洗牌 [英] Data shuffle for Hive and Spark window function

查看:140
本文介绍了为蜂房和星火窗口功能数据洗牌的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否在同一节点上的数据使用蜂巢窗函数时的数据已经出现洗牌?

Does data shuffle occur when use Hive window function on data that already on the same node?

具体在下面的例子中,使用前窗函数的数据已经由市星火重新分区()函数重新分区,
应确保城市的所有数据'A'共定位相同的节点(假设数据一个城市可以容纳在一个节点)上。

Specifically in the following example, before use window function data are already repartitioned by 'City' with Spark repartition() function, which should ensure all data of city 'A' co-localized on the same node (assuming data for a city can fit in to one node).

df = sqlContext.createDataFrame(
    [('A', '1', 2009, "data1"),
     ('A', '1', 2015, "data2"),
     ('A', '22', 2015, "data3"),
     ('A', '22', 2016, "data4"),
     ('BB', '333', 2014, "data5"), 
     ('BB', '333', 2012, "data6"), 
     ('BB', '333', 2016, "data7")
    ],
    ("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# |  BB|   333|2012|data6|
# |  BB|   333|2014|data5|
# |  BB|   333|2016|data7|
# |   A|    22|2016|data4|
# |   A|    22|2015|data3|
# |   A|     1|2009|data1|
# |   A|     1|2015|data2|
# +----+------+----+-----+

然后我有如下人,这是不是在重新分配星火分区键()做了一个窗口功能分区。

Then I have to do a window function partition by 'Person', which is not the partition key in Spark repartition() as follows.

df.registerTempTable('example')
sqlStr = """\
    select *,
        row_number() over (partition by Person order by year desc) ranking
    from example
"""
sqlContext.sql(sqlStr).show(100)

# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# |  BB|   333|2016|data7|      1|
# |  BB|   333|2014|data5|      2|
# |  BB|   333|2012|data6|      3|
# |   A|     1|2015|data2|      1|
# |   A|     1|2009|data1|      2|
# |   A|    22|2016|data4|      1|
# |   A|    22|2015|data3|      2|
# +----+------+----+-----+-------+

下面是我的问题:


  1. 有由分区星火再分配和蜂巢之间的任何关系或区别呢?引擎盖下,他们是翻译成星火上同样的事情?

  1. Is there any relation or difference between Spark "repartition" and Hive "partition by"? Under the hood, are they translated to the same thing on Spark?

我要检查我的理解如下是否正确。即使已经在同一节点上的所有数据,如果我叫星火df.repartition('A_key_different_from_current_partidion_key'),数据将被洗牌到多个节点,而不是厮守在同一节点上。

I want to check whether my following understanding is correct. Even all data already on the same node, if I call Spark df.repartition('A_key_different_from_current_partidion_key'), data will be shuffled to many nodes, instead of stay together on the same node.

顺便说一句,我也好奇,无论是简单的实现与星火窗函数的例子蜂巢​​查询。

BTW, I would also curious whether it is simple to implement the example Hive query with Spark window function.

推荐答案

双方分区窗口功能从句和再分配执行相同的 TungstenExchange 机制。你看这个,当你分析执行计划:

Both partition by clause in window functions and repartition are executed the same TungstenExchange mechanism. You see this when you analyze execution plan:

sqlContext.sql(sqlStr).explain()

## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
##    +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
##       +- Project [City#0,Person#1,year#2L,data#3]
##          +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
##             +- ConvertToUnsafe
##                +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]

关于第二个问题,你的假设是正确的。即使数据已经位于单个节点上,星火有没有先验的有关数据分布知识,将再次洗牌的数据。

Regarding the second question you assumption is correct. Even if data is already located on a single node, Spark has no a priori knowledge about data distribution and will shuffle data once again.

最后,根据一个观点,你的查询已经是一个星火查询,或者无法执行该使用普通的火花。

Finally, depending on a point of view, your query is already a Spark query, or it is impossible to execute this using plain Spark.


  • 这是一个星火查询,因为DSL对口究竟会使用相同的机制

  • it is a Spark query because DSL counterpart will use exactly the same mechanisms

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

w = Window.partitionBy("person").orderBy(col("year").desc())
df.withColumn("ranking", row_number().over(w))


  • 这是不可能的,因为星火1.6的有火花机实现的窗函数来执行这个使用普通的火花。它在星火2.0改变了。

  • it is impossible to execute this using plain Spark because as of Spark 1.6 there is native implementation of window functions in Spark. It changed in Spark 2.0.

    这篇关于为蜂房和星火窗口功能数据洗牌的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆