在Scala Spark中加入不同的Dataframe时动态选择多个列 [英] Dynamically select multiple columns while joining different Dataframe in Scala Spark

查看:410
本文介绍了在Scala Spark中加入不同的Dataframe时动态选择多个列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个Spark数据帧df1df2.有没有办法在连接这两个数据框时动态选择输出列?在内部联接的情况下,以下定义输出df1和df2中的所有列.

I have two spark data frame df1 and df2. Is there a way for selecting output columns dynamically while joining these two dataframes? The below definition outputs all column from df1 and df2 in case of inner join.

def joinDF (df1: DataFrame,  df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExprs, joinType)
  dfJoinResult
  //.select()
}

输入数据:

val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")

预期结果:

val dfJoinResult = df1
  .join(df2, df1("id") === df2("id"), "inner")
  .select(df1("type"), df1("account"), df2("value")) 

dfJoinResult.schema():

dfJoinResult.schema():

StructType(StructField(type,StringType,true), 
StructField(account,StringType,true), 
StructField(value,StringType,true))

我已经看过像df.select(cols.head, cols.tail: _*)这样的选项,但是它不允许从两个DF中选择列. 有没有一种方法可以动态传递selectExpr列以及我们要从我的def中选择的数据框详细信息?我正在使用Spark 2.2.0.

I have looked at options like df.select(cols.head, cols.tail: _*) but it does not allow to select columns from both DF's. Is there a way to pass selectExpr columns dynamically along with dataframe details that we want to select it from in my def? I'm using Spark 2.2.0.

推荐答案

可以将select表达式作为Seq[Column]传递给方法:

It is possible to pass the select expression as a Seq[Column] to the method:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr:_*)
}

要调用该方法,请使用:

To call the method use:

val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

这将提供所需的结果:

+------+-------+-----+
|  type|account|value|
+------+-------+-----+
|   new|current|    7|
|closed| saving|    5|
+------+-------+-----+


在上面的selectExpr中,有必要指定列来自哪个数据帧.但是,如果满足以下假设,则可以进一步简化:


In the selectExpr above, it is necessary to specify which dataframe the columns are coming from. However, this can be further simplified if the following assumptions are true:

  1. join上的列在两个数据框中都具有相同的名称
  2. 要选择的列具有唯一的名称(另一个数据框没有具有相同名称的列)
  1. The columns to join on have the same name in both dataframes
  2. The columns to be selected have unique names (the other dataframe do not have a column with the same name)

在这种情况下,可以将joinExpr: Column更改为joinExpr: Seq[String],将selectExpr: Seq[Column]更改为selectExpr: Seq[String]:

In this case, the joinExpr: Column can be changed to joinExpr: Seq[String] and selectExpr: Seq[Column] to selectExpr: Seq[String]:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)
}

现在调用该方法看起来更加简洁:

Calling the method now looks cleaner:

val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

注意:使用Seq[String]执行join时,与使用表达式相比,结果数据帧的列名将有所不同.如果存在具有相同名称的列,则以后将无法单独选择它们.

Note: When the join is performed using a Seq[String] the column names of the resulting dataframe will be different as compared to using an expression. When there are columns with the same name present, there will be no way to separately select these afterwards.

这篇关于在Scala Spark中加入不同的Dataframe时动态选择多个列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆