在Scala Spark中加入不同的Dataframe时动态选择多列 [英] Dynamically select multiple columns while joining different Dataframe in Scala Spark

查看:67
本文介绍了在Scala Spark中加入不同的Dataframe时动态选择多列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 spark 数据帧 df1df2.有没有办法在加入这两个数据帧时动态选择输出列?下面的定义在内部连接的情况下输出 df1 和 df2 的所有列.

I have two spark data frame df1 and df2. Is there a way for selecting output columns dynamically while joining these two dataframes? The below definition outputs all column from df1 and df2 in case of inner join.

def joinDF (df1: DataFrame,  df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExprs, joinType)
  dfJoinResult
  //.select()
}

输入数据:

val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")

预期结果:

val dfJoinResult = df1
  .join(df2, df1("id") === df2("id"), "inner")
  .select(df1("type"), df1("account"), df2("value")) 

dfJoinResult.schema():

dfJoinResult.schema():

StructType(StructField(type,StringType,true), 
StructField(account,StringType,true), 
StructField(value,StringType,true))

我查看了诸如 df.select(cols.head, cols.tail: _*) 之类的选项,但它不允许从两个 DF 中选择列.有没有办法动态传递 selectExpr 列以及我们想要在我的 def 中选择它的数据帧详细信息?我使用的是 Spark 2.2.0.

I have looked at options like df.select(cols.head, cols.tail: _*) but it does not allow to select columns from both DF's. Is there a way to pass selectExpr columns dynamically along with dataframe details that we want to select it from in my def? I'm using Spark 2.2.0.

推荐答案

可以将 select 表达式作为 Seq[Column] 传递给方法:

It is possible to pass the select expression as a Seq[Column] to the method:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr:_*)
}

调用方法使用:

val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

这将给出想要的结果:

+------+-------+-----+
|  type|account|value|
+------+-------+-----+
|   new|current|    7|
|closed| saving|    5|
+------+-------+-----+

<小时>

在上面的 selectExpr 中,需要指定列来自哪个数据帧.但是,如果以下假设成立,则可以进一步简化:


In the selectExpr above, it is necessary to specify which dataframe the columns are coming from. However, this can be further simplified if the following assumptions are true:

  1. join 的列在两个数据框中具有相同的名称
  2. 要选择的列具有唯一名称(另一个数据框没有具有相同名称的列)
  1. The columns to join on have the same name in both dataframes
  2. The columns to be selected have unique names (the other dataframe do not have a column with the same name)

在这种情况下,joinExpr: Column 可以改为 joinExpr: Seq[String]selectExpr: Seq[Column]selectExpr: Seq[String]:

In this case, the joinExpr: Column can be changed to joinExpr: Seq[String] and selectExpr: Seq[Column] to selectExpr: Seq[String]:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)
}

现在调用方法看起来更简洁:

Calling the method now looks cleaner:

val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

注意:当使用 Seq[String] 执行 join 时,结果数据帧的列名将与使用表达式.当存在具有相同名称的列时,之后将无法单独选择这些列.

Note: When the join is performed using a Seq[String] the column names of the resulting dataframe will be different as compared to using an expression. When there are columns with the same name present, there will be no way to separately select these afterwards.

这篇关于在Scala Spark中加入不同的Dataframe时动态选择多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆