改变火花中的数据捕获 [英] change data capture in spark

查看:65
本文介绍了改变火花中的数据捕获的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要做的要求,但是我很困惑如何去做.我有两个数据框.所以第一次我得到下面的数据文件1

I have got a requirement to do , but I am confused how to do it. I have two dataframes. so first time i got the below data file1

文件1产品,最后更新日期,指示器

file1 prodid, lastupdatedate, indicator

00001,,A
00002,01-25-1981,A
00003,01-26-1982,A
00004,12-20-1985,A

输出应为

0001,1900-01-01, 2400-01-01, A
0002,1981-01-25, 2400-01-01, A
0003,1982-01-26, 2400-01-01, A
0004,1985-12-20, 2400-10-01, A

第二次我又得到一个文件2

Second time i got another one file2

产品,最后更新日期,指示器

prodid, lastupdatedate, indicator

00002,01-25-2018,U
00004,01-25-2018,U
00006,01-25-2018,A
00008,01-25-2018,A

我想要最终结果

00001,1900-01-01,2400-01-01,A
00002,1981-01-25,2018-01-25,I
00002,2018-01-25,2400-01-01,A
00003,1982-01-26,2400-01-01,A
00004,1985-12-20,2018-01-25,I
00004,2018-01-25,2400-01-01,A
00006,2018-01-25,2400-01-01,A
00008,2018-01-25,2400-01-01,A

因此,无论第二个文件中有什么更新,该日期都应该显示在第二列中,而默认日期(2400-01-01)应该出现在第三列中,并且具有相关性指示符.默认指示器为A

so whatever the updates are there in the second file that date should come in the second column and the default date (2400-01-01) should come in the third column and the relavant indicator. The default indicator is A

我已经开始这样了

val spark=SparkSession.builder()
    .master("local")
    .appName("creating data frame for csv")
    .getOrCreate()
   
    import spark.implicits._ 
    val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("d:/prod.txt")
  
    val df1 = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("d:/prod1.txt")
  

val newdf = df.na.fill("01-01-1900",Seq("lastupdatedate"))

if((df1("indicator")=='U') && (df1("prodid")== newdf("prodid"))){
    val df3 = df1.except(newdf)
    }

推荐答案

您应使用 prodid 将它们 join 并使用一些 when 函数将数据帧操纵到预期的输出.您应该过滤第二行的更新数据框并将其合并回去(我已经提供了解释代码各部分的注释)

You should join them with prodid and use some when function to manipulate the dataframes to the expected output. You should filter the updated dataframes for second rows and merge them back (I have included comments for explaining each part of the code)

import org.apache.spark.sql.functions._
//filling empty lastupdatedate and changing the date to the expected format
val newdf = df.na.fill("01-01-1900",Seq("lastupdatedate"))
  .withColumn("lastupdatedate", date_format(unix_timestamp(trim(col("lastupdatedate")), "MM-dd-yyyy").cast("timestamp"), "yyyy-MM-dd"))

//changing the date to the expected format of the second dataframe
val newdf1 = df1.withColumn("lastupdatedate", date_format(unix_timestamp(trim(col("lastupdatedate")), "MM-dd-yyyy").cast("timestamp"), "yyyy-MM-dd"))

//joining both dataframes and updating columns according to your needs
val tempdf = newdf.as("table1").join(newdf1.as("table2"),Seq("prodid"), "outer")
    .select(col("prodid"),
      when(col("table1.lastupdatedate").isNotNull, col("table1.lastupdatedate")).otherwise(col("table2.lastupdatedate")).as("lastupdatedate"),
      when(col("table1.indicator").isNotNull, when(col("table2.lastupdatedate").isNotNull, col("table2.lastupdatedate")).otherwise(lit("2400-01-01"))).otherwise(lit("2400-01-01")).as("defaultdate"),
      when(col("table2.indicator").isNull, col("table1.indicator")).otherwise(when(col("table2.indicator") === "U", lit("I")).otherwise(col("table2.indicator"))).as("indicator"))

//filtering tempdf for duplication
val filtereddf = tempdf.filter(col("indicator") === "I")
                        .withColumn("lastupdatedate", col("defaultdate"))
                        .withColumn("defaultdate", lit("2400-01-01"))
                        .withColumn("indicator", lit("A"))

//finally merging both dataframes
tempdf.union(filtereddf).sort("prodid", "lastupdatedate").show(false)

应该给您

+------+--------------+-----------+---------+
|prodid|lastupdatedate|defaultdate|indicator|
+------+--------------+-----------+---------+
|1     |1900-01-01    |2400-01-01 |A        |
|2     |1981-01-25    |2018-01-25 |I        |
|2     |2018-01-25    |2400-01-01 |A        |
|3     |1982-01-26    |2400-01-01 |A        |
|4     |1985-12-20    |2018-01-25 |I        |
|4     |2018-01-25    |2400-01-01 |A        |
|6     |2018-01-25    |2400-01-01 |A        |
|8     |2018-01-25    |2400-01-01 |A        |
+------+--------------+-----------+---------+

这篇关于改变火花中的数据捕获的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆