在Spark中对多个DataFrame执行联接 [英] perform join on multiple DataFrame in spark

查看:311
本文介绍了在Spark中对多个DataFrame执行联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有3个数据帧,它们是通过3个不同的过程生成的. 每个数据框都有相同名称的列. 我的数据框看起来像这样

I have 3dataframes generated from 3 different processes. Every dataframe is having columns of same name. My dataframe looks like this

id   val1    val2       val3    val4
 1    null   null       null    null
 2    A2      A21       A31      A41

id   val1      val2       val3      val4
 1    B1        B21        B31       B41
 2    null      null       null      null

id   val1     val2       val3    val4
 1    C1       C2        C3       C4
 2    C11      C12       C13      C14

在这3个数据框中,我想创建两个数据框(最终的和合并的). 对于最终的偏好顺序- dataFrame 1> Dataframe 2> Dataframe 3

Out of these 3 dataframes, i want to create two dataframes, (final and consolidated). For final, order of preferences - dataFrame 1 > Dataframe 2 > Dataframe 3

如果结果在数据帧1中存在(val1!= null),我将把该行存储在最终数据帧中.

If a result is there in dataframe 1(val1 != null), i will store that row in final dataframe.

我的最终结果应该是:

id  finalVal1    finalVal2   finalVal3   finalVal4 
1     B1           B21         B31         B41
2     A2           A21         A31         A41

合并数据框将存储所有3个结果.

Consolidated Dataframe will store results from all 3.

我如何有效地做到这一点?

How can i do that efficiently?

推荐答案

如果我对您的理解正确,那么对于每一行,您都想查找第一个非空值,首先要查看第一个表,然后查看第二个表,然后是第三张桌子.

If I understood you correctly, for each row you want to find out the first non-null values, first by looking into the first table, then the second table, then the third table.

您只需要基于id联接这三个表,然后使用coalesce函数获取第一个非null元素

You simply need to join these three tables based on the id and then use the coalesce function to get the first non-null element

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null),
    (2,"A2","A21","A31", "A41"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"),
    (2,null,null,null,null))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"),
    (2,"C11","C12","C13", "C14"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
  df1("id"),
  coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
  coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
  coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
  coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)

哪个可以提供预期的输出结果

Which gives you the expected output

+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

这篇关于在Spark中对多个DataFrame执行联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆