在Scala Spark中加入不同的Dataframe时动态选择多个列 [英] Dynamically select multiple columns while joining different Dataframe in Scala Spark
问题描述
我有两个Spark数据帧df1
和df2
.有没有办法在连接这两个数据框时动态选择输出列?在内部联接的情况下,以下定义输出df1和df2中的所有列.
I have two spark data frame df1
and df2
. Is there a way for selecting output columns dynamically while joining these two dataframes? The below definition outputs all column from df1 and df2 in case of inner join.
def joinDF (df1: DataFrame, df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {
val dfJoinResult = df1.join(df2, joinExprs, joinType)
dfJoinResult
//.select()
}
输入数据:
val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")
预期结果:
val dfJoinResult = df1
.join(df2, df1("id") === df2("id"), "inner")
.select(df1("type"), df1("account"), df2("value"))
dfJoinResult.schema():
dfJoinResult.schema():
StructType(StructField(type,StringType,true),
StructField(account,StringType,true),
StructField(value,StringType,true))
我已经看过像df.select(cols.head, cols.tail: _*)
这样的选项,但是它不允许从两个DF中选择列.
有没有一种方法可以动态传递selectExpr
列以及我们要从我的def
中选择的数据框详细信息?我正在使用Spark 2.2.0.
I have looked at options like df.select(cols.head, cols.tail: _*)
but it does not allow to select columns from both DF's.
Is there a way to pass selectExpr
columns dynamically along with dataframe details that we want to select it from in my def
? I'm using Spark 2.2.0.
推荐答案
可以将select
表达式作为Seq[Column]
传递给方法:
It is possible to pass the select
expression as a Seq[Column]
to the method:
def joinDF(df1: DataFrame, df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {
val dfJoinResult = df1.join(df2, joinExpr, joinType)
dfJoinResult.select(selectExpr:_*)
}
要调用该方法,请使用:
To call the method use:
val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))
val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)
这将提供所需的结果:
+------+-------+-----+
| type|account|value|
+------+-------+-----+
| new|current| 7|
|closed| saving| 5|
+------+-------+-----+
在上面的selectExpr
中,有必要指定列来自哪个数据帧.但是,如果满足以下假设,则可以进一步简化:
In the selectExpr
above, it is necessary to specify which dataframe the columns are coming from. However, this can be further simplified if the following assumptions are true:
-
join
上的列在两个数据框中都具有相同的名称 - 要选择的列具有唯一的名称(另一个数据框没有具有相同名称的列)
- The columns to
join
on have the same name in both dataframes - The columns to be selected have unique names (the other dataframe do not have a column with the same name)
在这种情况下,可以将joinExpr: Column
更改为joinExpr: Seq[String]
,将selectExpr: Seq[Column]
更改为selectExpr: Seq[String]
:
In this case, the joinExpr: Column
can be changed to joinExpr: Seq[String]
and selectExpr: Seq[Column]
to selectExpr: Seq[String]
:
def joinDF(df1: DataFrame, df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {
val dfJoinResult = df1.join(df2, joinExpr, joinType)
dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)
}
现在调用该方法看起来更加简洁:
Calling the method now looks cleaner:
val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")
val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)
注意:使用Seq[String]
执行join
时,与使用表达式相比,结果数据帧的列名将有所不同.如果存在具有相同名称的列,则以后将无法单独选择它们.
Note: When the join
is performed using a Seq[String]
the column names of the resulting dataframe will be different as compared to using an expression. When there are columns with the same name present, there will be no way to separately select these afterwards.
这篇关于在Scala Spark中加入不同的Dataframe时动态选择多个列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!