spark SQL中的递归cte [英] recursive cte in spark SQL

查看:204
本文介绍了spark SQL中的递归cte的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

; WITH  Hierarchy as 
        (
            select distinct PersonnelNumber
            , Email
            , ManagerEmail 
            from dimstage
            union all
            select e.PersonnelNumber
            , e.Email           
            , e.ManagerEmail 
            from dimstage  e
            join Hierarchy as  h on e.Email = h.ManagerEmail
        )
        select * from Hierarchy

你能帮忙在 SPARK SQL 中实现同样的功能吗

Can you help acheive the same in SPARK SQL

推荐答案

这已经很晚了,但今天我尝试使用 PySpark SQL 实现 cte 递归查询.

This is quite late, but today I tried to implement the cte recursive query using PySpark SQL.

这里,我有这个简单的数据框.我想要做的是找到每个ID的NEWEST ID.

Here, I have this simple dataframe. What I want to do is to find the NEWEST ID of each ID.

原始数据框:

+-----+-----+
|OldID|NewID|
+-----+-----+
|    1|    2|
|    2|    3|
|    3|    4|
|    4|    5|
|    6|    7|
|    7|    8|
|    9|   10|
+-----+-----+

我想要的结果:

+-----+-----+
|OldID|NewID|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    6|    8|
|    7|    8|
|    9|   10|
+-----+-----+

这是我的代码:

df = sqlContext.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer").checkpoint().cache()

dfcheck = df.drop('NewID')
dfdistinctID = df.select('NewID').distinct()
dfidfinal = dfdistinctID.join(dfcheck, [dfcheck.OldID == dfdistinctID.NewID], how="left_anti") #We find the IDs that have not been replaced

dfcurrent = df.join(dfidfinal, [dfidfinal.NewID == df.NewID], how="left_semi").checkpoint().cache() #We find the the rows that are related to the IDs that have not been replaced, then assign them to the dfcurrent dataframe.
dfresult = dfcurrent
dfdifferentalias = df.select(df.OldID.alias('id1'), df.NewID.alias('id2')).checkpoint().cache()

while dfcurrent.count() > 0:
  dfcurrent = dfcurrent.join(broadcast(dfdifferentalias), [dfcurrent.OldID == dfdifferentalias.id2], how="inner").select(dfdifferentalias.id1.alias('OldID'), dfcurrent.NewID.alias('NewID')).cache()
  dfresult = dfresult.unionAll(dfcurrent)

display(dfresult.orderBy('OldID'))

Databricks 笔记本屏幕截图

我知道性能很差,但至少,它给出了我需要的答案.

I know that the performance is quite bad, but at least, it give the answer I need.

这是我第一次在 StackOverFlow 上发表回答,如有错误请见谅.

This is the first time that I post an answer to StackOverFlow, so forgive me if I made any mistake.

这篇关于spark SQL中的递归cte的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆