在 Spark 数据帧中查找 [英] Lookup in Spark dataframes

查看:21
本文介绍了在 Spark 数据帧中查找的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Spark 1.6,我想知道如何在数据帧中进行查找.

I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.

我有两个数据框员工 &部门.

I have two dataframes employee & department.

  • 员工数据框

  • Employee Dataframe

-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David

  • 部门数据框

  • Department Dataframe

    --------------------
    Dept Id | Dept Name | Emp Id
    -----------------------------
    1 | Admin | 1
    2 | HR | 2
    

  • 我想从employee 表中查找emp id 到department 表并获取dept 名称.所以,结果集将是

    I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be

    Emp Id | Dept Name
    -------------------
    1 | Admin
    2 | HR
    

    我如何在 SPARK 中实现这个查找 UDF 功能.我不想在两个数据帧上都使用 JOIN.

    How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.

    推荐答案

    正如评论中已经提到的,加入数据框是要走的路.

    As already mentioned in the comments, joining the dataframes is the way to go.

    您可以使用查找,但我认为没有分布式"解决方案,即您必须将查找表收集到驱动程序内存中.另请注意,此方法假定 EmpID 是唯一的:

    You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:

    import org.apache.spark.sql.functions._
    import sqlContext.implicits._
    import scala.collection.Map
    
    val emp = Seq((1,"John"),(2,"David"))
    val deps = Seq((1,"Admin",1),(2,"HR",2))
    
    val empRdd = sc.parallelize(emp)
    val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
    
    
    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
    
    val combinedDF = depsDF
      .withColumn("empNames",lookup(lookupMap)($"EmpID"))
    

    我最初的想法是将 empRdd 传递给 UDF 并使用 PairRDD 上定义的 lookup 方法,但这当然不是工作,因为您不能在转换(即 UDF)中使用火花操作(即 lookup).

    My initial thought was to pass the empRdd to the UDF and use the lookup method defined on PairRDD, but this does of course not work because you cannot have spark actions (i.e. lookup) within transformations (ie. the UDF).

    如果您的 empDf 有多个列(例如姓名、年龄),您可以使用这个

    If your empDf has multiple columns (e.g. Name,Age), you can use this

    val empRdd = empDf.rdd.map{row =>
          (row.getInt(0),(row.getString(1),row.getInt(2)))}
    
    
        val lookupMap = empRdd.collectAsMap()
        def lookup(lookupMap:Map[Int,(String,Int)]) =
             udf((empID:Int) => lookupMap.lift(empID))
    
        depsDF
          .withColumn("lookup",lookup(lookupMap)($"EmpID"))
          .withColumn("empName",$"lookup._1")
          .withColumn("empAge",$"lookup._2")
          .drop($"lookup")
          .show()
    

    这篇关于在 Spark 数据帧中查找的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆