在spark/scala中动态生成连接条件 [英] generating join condition dynamically in spark/scala

查看:112
本文介绍了在spark/scala中动态生成连接条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够将两个数据帧的连接条件作为输入字符串传递.想法是使联接足够通用,以便用户可以传递他们喜欢的条件.

I want to be able to pass the join condition for two data frames as an input string. The idea is to make the join generic enough so that the user could pass on the condition they like.

这就是我现在正在做的事情.尽管可以,但我认为它并不干净.

Here's how I am doing it right now. Although it works, I think its not clean.

val testInput =Array("a=b", "c=d")
val condition: Column = testInput.map(x => testMethod(x)).reduce((a,b) => a.and(b))
firstDataFrame.join(secondDataFrame, condition, "fullouter")

这是testMethod

Here's the testMethod

def testMethod(inputString: String): Column = {
  val splitted = inputString.split("=")
  col(splitted.apply(0)) === col(splitted.apply(1))
}

需要帮助,以找出一种更好的方式来获取输入以动态生成联接条件

Need help in figuring out a better way of taking input to generate the join condition dynamically

推荐答案

不确定像这样的自定义方法是否会带来太多好处,但是如果您必须沿着那条路走,我建议您也将其覆盖在 join :

Not sure custom method like such would provide too much benefit, but if you must go down that path I would recommend making it cover also join on:

  1. 同名的列(这很常见)
  2. 不平等条件

下面的示例代码:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def joinDFs(dfL: DataFrame, dfR: DataFrame, conditions: List[String], joinType: String) = {
  val joinConditions = conditions.map( cond => {
      val arr = cond.split("\\s+")
      if (arr.size != 3) throw new Exception("Invalid join conditions!") else
        arr(1) match {
          case "<"  => dfL(arr(0)) <   dfR(arr(2))
          case "<=" => dfL(arr(0)) <=  dfR(arr(2))
          case "="  => dfL(arr(0)) === dfR(arr(2))
          case ">=" => dfL(arr(0)) >=  dfR(arr(2))
          case ">"  => dfL(arr(0)) >   dfR(arr(2))
          case "!=" => dfL(arr(0)) =!= dfR(arr(2))
          case _ => throw new Exception("Invalid join conditions!")
        }
    } ).
    reduce(_ and _)

  dfL.join(dfR, joinConditions, joinType)
}

val dfLeft = Seq(
  (1, "2018-04-01", "p"),
  (1, "2018-04-01", "q"),
  (2, "2018-05-01", "r")
).toDF("id", "date", "value")

val dfRight = Seq(
  (1, "2018-04-15", "x"),
  (2, "2018-04-15", "y")
).toDF("id", "date", "value")

val conditions = List("id = id", "date <= date")

joinDFs(dfLeft, dfRight, conditions, "left_outer").
  show
// +---+----------+-----+----+----------+-----+
// | id|      date|value|  id|      date|value|
// +---+----------+-----+----+----------+-----+
// |  1|2018-04-01|    p|   1|2018-04-15|    x|
// |  1|2018-04-01|    q|   1|2018-04-15|    x|
// |  2|2018-05-01|    r|null|      null| null|
// +---+----------+-----+----+----------+-----+

这篇关于在spark/scala中动态生成连接条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆