如何使用窗口规范和每列值的连接条件? [英] How to use window specification and join condition per column values?

查看:40
本文介绍了如何使用窗口规范和每列值的连接条件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的DF1

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction
4295858898|^|204|^|205|^|1|^|I|!|
4295858898|^|204|^|208|^|2|^|I|!|
4295858898|^|204|^|209|^|2|^|I|!|
4295858898|^|204|^|211|^|3|^|I|!|
4295858898|^|204|^|212|^|3|^|I|!|
4295858898|^|204|^|214|^|4|^|I|!|
4295858898|^|204|^|215|^|4|^|I|!|
4295858898|^|206|^|207|^|1|^|I|!|
4295858898|^|206|^|210|^|2|^|I|!|
4295858898|^|206|^|213|^|3|^|I|!|

这是我的DF2

   DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
  SelfSourcedPublic|^|2002|^|1511224917595|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917596|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917597|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917598|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917599|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917600|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917601|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917602|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917603|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917604|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917605|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917606|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917607|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917608|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917609|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917610|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917611|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917612|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917613|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917614|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917615|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917616|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917617|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917618|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917619|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917620|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917621|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1511224917622|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1511224917642|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917643|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917644|^|4295858941|^|null|^|37|^|null|^|D|!|

我想基于列的值实现连接.

I want to implement join based on the value of the column.

例如,这就是我试图在Spark-Scala中实现的目标,但不知道如何实现

This is what I am trying to achieve in Spark-Scala for example but don't know how to implement it

如果DF2中的FFAction_1 =I处于以下条件

If the FFAction_1 =I in the DF2 then below condition

(在三列"OrganizationId", "AnnualPeriodId","InterimPeriodId"上进行join和partitionBy)

(join and partitionBy on three columns "OrganizationId", "AnnualPeriodId","InterimPeriodId")

val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
  .filter(!$"FFAction".contains("D"))

如果FFAction_1 =O or D则低于条件

If the FFAction_1 =O or D then below condition

(在两列"OrganizationId","InterimPeriodId"上进行join和partitionBy)

(join and partitionBy on two columns "OrganizationId","InterimPeriodId")

val windowSpec = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
   .filter(!$"FFAction".contains("D"))

下面是我的完整代码

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))


//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from increamental ------------------------------
 
    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")


    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the increamental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert
    
    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in increamental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)


    dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count
  
  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

推荐答案

免责声明问题我刚刚回答的问题似乎是重复的,所以很快就会被打上这样的标记,或者我们发现它们之间的区别,并且免责声明消失了.时间会证明一切.

DISCLAIMER Somehow this and the other question I've just answered seem duplicates so one is going to get marked as such soon or we find out the difference between them and the disclaimer goes away. Time will tell.

鉴于需要根据FFAction_1列的值选择最终的窗口规格和连接条件,我将首先执行filter并确定要使用的窗口聚合和连接.

Given the requirement to select the final window specification and join condition based on the values of FFAction_1 column, I'd do filter first and decide what window aggregation and join to use.

val df1 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df1.csv").
  select("OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber", "FFAction")
scala> df1.show
+--------------+--------------+---------------+-------------+--------+
|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber|FFAction|
+--------------+--------------+---------------+-------------+--------+
|    4295858898|           204|            205|            1|       I|
|    4295858898|           204|            208|            2|       I|
|    4295858898|           204|            209|            2|       I|
|    4295858898|           204|            211|            3|       I|
|    4295858898|           204|            212|            3|       I|
|    4295858898|           204|            214|            4|       I|
|    4295858898|           204|            215|            4|       I|
|    4295858898|           206|            207|            1|       I|
|    4295858898|           206|            210|            2|       I|
|    4295858898|           206|            213|            3|       I|
+--------------+--------------+---------------+-------------+--------+

连接的右侧在形状"方面非常相似.

The right-hand side of the join is fairly similar in "shape".

val df2 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df2.csv").
  select("DataPartition_1", "PartitionYear_1", "TimeStamp", "OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber_1", "FFAction_1")
scala> df2.show
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|  DataPartition_1|PartitionYear_1|    TimeStamp|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber_1|FFAction_1|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|SelfSourcedPublic|           2002|1510725106270|    4295858941|            24|             25|              4|         O|
|SelfSourcedPublic|           2002|1510725106271|    4295858941|            24|             25|              5|         O|
|SelfSourcedPublic|           2003|1510725106272|    4295858941|            30|             31|              2|         O|
|SelfSourcedPublic|           2003|1510725106273|    4295858941|            30|             31|              3|         O|
|SelfSourcedPublic|           2001|1510725106293|    4295858941|             5|             20|              2|         O|
|SelfSourcedPublic|           2001|1510725106294|    4295858941|             5|             21|              3|         O|
|SelfSourcedPublic|           2002|1510725106295|    4295858941|             1|             22|              4|         O|
|SelfSourcedPublic|           2002|1510725106296|    4295858941|             1|             23|              5|         O|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         I|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         D|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+

使用上述数据集,我想filter看看FFAction_1列中df2中是否至少有一个I并选择正确的窗口规范和连接条件.

With the above datasets, I'd filter out to see if there's at least one I in df2 in FFAction_1 column and select the correct window specification and join condition.

诀窍是先使用join运算符,再使用where(或filter)运算符,以便您可以决定要使用的联接条件.

The trick is to use join operator followed by where (or filter) operator so you can decide on what join condition to use.

val noIs = df2.filter($"FFAction_1" === "I").take(1).isEmpty
val (windowSpec, joinCond) = if (noIs) {
  (windowSpecForOs, joinForOs) 
} else {
  (windowSpecForIs, joinForIs)
}
val latestForEachKey = df2result.withColumn("rank", rank() over windowSpec)
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey).where(joinCond)

这篇关于如何使用窗口规范和每列值的连接条件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆