Spark DataFrame:查找并设置子级的主根 [英] Spark DataFrame: find and set the main root for child
问题描述
我有以下Apache Spark数据框:
I have the following Apache Spark Dataframe:
父母-孩子
A1 - A10
A1 - A2
A2 - A3
A3 - A4
A5 - A7
A7 - A6
A8 - A9
Parent - Child
A1 - A10
A1 - A2
A2 - A3
A3 - A4
A5 - A7
A7 - A6
A8 - A9
此DataFrame显示父级和子级之间的连接.逻辑上看起来像这样:
This DataFrame displays a connection between parent and child. Logically it looks like this one:
主要目标是为每个孩子设置主根.这意味着我们应该具有以下数据框:
The main goal is setting the main root for each child. That's mean we should have the follwoing dataframe:
父母-孩子
A1 - A10
A1 - A2
A1 - A3
A1 - A4
A5 - A7
A5 - A6
A8 - A9
Parent - Child
A1 - A10
A1 - A2
A1 - A3
A1 - A4
A5 - A7
A5 - A6
A8 - A9
- 所有内容都应使用Apache Spark来实现.
- 节点数不受限制.这意味着无论节点数量如何,算法都应该起作用
推荐答案
使用以下方法,我相信您可以实现
With the below approach I believe you can achieve it
val input_rdd = spark.sparkContext.parallelize(List(("A1", "A10"), ("A1", "A2"), ("A2", "A3"), ("A3", "A4"), ("A5", "A7"), ("A7", "A6"), ("A8", "A9"), ("A4", "A11"), ("A11", "A12"), ("A6", "A13")))
val input_df = input_rdd.toDF("Parent", "Child")
input_df.createOrReplaceTempView("TABLE1")
input_df.show()
输入
+------+-----+
|Parent|Child|
+------+-----+
| A1| A10|
| A1| A2|
| A2| A3|
| A3| A4|
| A5| A7|
| A7| A6|
| A8| A9|
| A4| A11|
| A11| A12|
| A6| A13|
+------+-----+
# # linkchild function to get the root
def linkchild(df: DataFrame): DataFrame = {
df.createOrReplaceTempView("TEMP")
val link_child_df = spark.sql("""select distinct a.parent, b.child from TEMP a inner join TEMP b on a.parent = b.parent or a.child = b.parent""")
link_child_df
}
# # findroot function to validate and generate output
def findroot(rdf: DataFrame) {
val link_child_df = linkchild(rdf)
link_child_df.createOrReplaceTempView("TEMP1")
val cnt = spark.sql("""select * from table1 where child not in (select child from (select * from (select distinct a.parent, b.child from TEMP1 a inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
where a.parent not in(select distinct child from TABLE1))))""").count()
if (cnt == 0) {
spark.sql("""select * from (select distinct a.parent, b.child from TEMP1 a inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
where a.parent not in(select distinct child from TABLE1)) order by parent, child""").show
} else {
findroot(link_child_df)
}
}
# # Calling findroot function for the first time with input_df which in turn calls linkchild function till it reaches target
findroot(input_df)
输出
+------+-----+
|parent|child|
+------+-----+
| A1| A10|
| A1| A11|
| A1| A12|
| A1| A14|
| A1| A15|
| A1| A16|
| A1| A17|
| A1| A18|
| A1| A2|
| A1| A3|
| A1| A4|
| A5| A13|
| A5| A6|
| A5| A7|
| A8| A9|
+------+-----+
这篇关于Spark DataFrame:查找并设置子级的主根的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!