在Spark数据帧中查找 [英] Lookup in Spark dataframes

查看:62
本文介绍了在Spark数据帧中查找的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 1.6,我想知道如何在数据帧的查找中实现.

I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.

我有两个数据框员工&部门.

I have two dataframes employee & department.

  • 员工数据框

  • Employee Dataframe

-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David

  • 部门数据框

  • Department Dataframe

    --------------------
    Dept Id | Dept Name | Emp Id
    -----------------------------
    1 | Admin | 1
    2 | HR | 2
    

  • 我想从employee表到部门表中查找emp id,并获取部门名称.因此,结果集将是

    I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be

    Emp Id | Dept Name
    -------------------
    1 | Admin
    2 | HR
    

    如何在SPARK中实现此查找UDF功能.我不想在两个数据帧上都使用JOIN.

    How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.

    推荐答案

    正如注释中已经提到的,加入数据框是解决问题的方法.

    As already mentioned in the comments, joining the dataframes is the way to go.

    您可以使用查找,但是我认为没有分布式"解决方案,即您必须将查找表收集到驱动程序内存中.还要注意,这种方法假定EmpID是唯一的:

    You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:

    import org.apache.spark.sql.functions._
    import sqlContext.implicits._
    import scala.collection.Map
    
    val emp = Seq((1,"John"),(2,"David"))
    val deps = Seq((1,"Admin",1),(2,"HR",2))
    
    val empRdd = sc.parallelize(emp)
    val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
    
    
    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
    
    val combinedDF = depsDF
      .withColumn("empNames",lookup(lookupMap)($"EmpID"))
    

    我最初的想法是将empRdd传递给UDF并使用在PairRDD上定义的lookup方法,但这当然不起作用,因为您不能在转换中包含火花动作(即lookup) (即UDF).

    My initial thought was to pass the empRdd to the UDF and use the lookup method defined on PairRDD, but this does of course not work because you cannot have spark actions (i.e. lookup) within transformations (ie. the UDF).

    如果您的empDf有多个列(例如Name,Age),则可以使用此

    If your empDf has multiple columns (e.g. Name,Age), you can use this

    val empRdd = empDf.rdd.map{row =>
          (row.getInt(0),(row.getString(1),row.getInt(2)))}
    
    
        val lookupMap = empRdd.collectAsMap()
        def lookup(lookupMap:Map[Int,(String,Int)]) =
             udf((empID:Int) => lookupMap.lift(empID))
    
        depsDF
          .withColumn("lookup",lookup(lookupMap)($"EmpID"))
          .withColumn("empName",$"lookup._1")
          .withColumn("empAge",$"lookup._2")
          .drop($"lookup")
          .show()
    

    这篇关于在Spark数据帧中查找的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆