在Spark数据帧中查找 [英] Lookup in Spark dataframes
问题描述
我正在使用Spark 1.6,我想知道如何在数据帧的查找中实现.
I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.
我有两个数据框员工&部门.
I have two dataframes employee & department.
-
员工数据框
Employee Dataframe
-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David
部门数据框
Department Dataframe
--------------------
Dept Id | Dept Name | Emp Id
-----------------------------
1 | Admin | 1
2 | HR | 2
我想从employee表到部门表中查找emp id,并获取部门名称.因此,结果集将是
I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be
Emp Id | Dept Name
-------------------
1 | Admin
2 | HR
如何在SPARK中实现此查找UDF功能.我不想在两个数据帧上都使用JOIN.
How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.
推荐答案
正如注释中已经提到的,加入数据框是解决问题的方法.
As already mentioned in the comments, joining the dataframes is the way to go.
您可以使用查找,但是我认为没有分布式"解决方案,即您必须将查找表收集到驱动程序内存中.还要注意,这种方法假定EmpID是唯一的:
You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
我最初的想法是将empRdd
传递给UDF并使用在PairRDD
上定义的lookup
方法,但这当然不起作用,因为您不能在转换中包含火花动作(即lookup
) (即UDF).
My initial thought was to pass the empRdd
to the UDF and use the lookup
method defined on PairRDD
, but this does of course not work because you cannot have spark actions (i.e. lookup
) within transformations (ie. the UDF).
如果您的empDf有多个列(例如Name,Age),则可以使用此
If your empDf has multiple columns (e.g. Name,Age), you can use this
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
这篇关于在Spark数据帧中查找的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!