在Spark中对多个DataFrame执行联接 [英] perform join on multiple DataFrame in spark
问题描述
我有3个数据帧,它们是通过3个不同的过程生成的. 每个数据框都有相同名称的列. 我的数据框看起来像这样
I have 3dataframes generated from 3 different processes. Every dataframe is having columns of same name. My dataframe looks like this
id val1 val2 val3 val4
1 null null null null
2 A2 A21 A31 A41
id val1 val2 val3 val4
1 B1 B21 B31 B41
2 null null null null
id val1 val2 val3 val4
1 C1 C2 C3 C4
2 C11 C12 C13 C14
在这3个数据框中,我想创建两个数据框(最终的和合并的). 对于最终的偏好顺序- dataFrame 1> Dataframe 2> Dataframe 3
Out of these 3 dataframes, i want to create two dataframes, (final and consolidated). For final, order of preferences - dataFrame 1 > Dataframe 2 > Dataframe 3
如果结果在数据帧1中存在(val1!= null),我将把该行存储在最终数据帧中.
If a result is there in dataframe 1(val1 != null), i will store that row in final dataframe.
我的最终结果应该是:
id finalVal1 finalVal2 finalVal3 finalVal4
1 B1 B21 B31 B41
2 A2 A21 A31 A41
合并数据框将存储所有3个结果.
Consolidated Dataframe will store results from all 3.
我如何有效地做到这一点?
How can i do that efficiently?
推荐答案
如果我对您的理解正确,那么对于每一行,您都想查找第一个非空值,首先要查看第一个表,然后查看第二个表,然后是第三张桌子.
If I understood you correctly, for each row you want to find out the first non-null values, first by looking into the first table, then the second table, then the third table.
您只需要基于id
联接这三个表,然后使用coalesce
函数获取第一个非null元素
You simply need to join these three tables based on the id
and then use the coalesce
function to get the first non-null element
import org.apache.spark.sql.functions._
val df1 = sc.parallelize(Seq(
(1,null,null,null,null),
(2,"A2","A21","A31", "A41"))
).toDF("id", "val1", "val2", "val3", "val4")
val df2 = sc.parallelize(Seq(
(1,"B1","B21","B31", "B41"),
(2,null,null,null,null))
).toDF("id", "val1", "val2", "val3", "val4")
val df3 = sc.parallelize(Seq(
(1,"C1","C2","C3","C4"),
(2,"C11","C12","C13", "C14"))
).toDF("id", "val1", "val2", "val3", "val4")
val consolidated = df1.join(df2, "id").join(df3, "id").select(
df1("id"),
coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)
哪个可以提供预期的输出结果
Which gives you the expected output
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
这篇关于在Spark中对多个DataFrame执行联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!