如何验证历史数据? [英] How to validate history data?

查看:32
本文介绍了如何验证历史数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我们正在使用日历实例读取日期,以使用 sparksql 选择最后一个月的记录.现在我们需要: 如果前一天添加了额外的事件,我们还必须能够手动插入摘要开始和结束日期,以防我们需要手动重新运行前一个时间段的作业:例如:手动重新运行表可能如下所示:

rprtng_period_type_cd summary_start_date summary_end_date summary_iv男 2018-01-01 2018-01-31 2018-01D 2018-03-05 2018-03-05 2018-03-05D 2018-03-27 2018-03-27 2018-03-27

这应该告诉工作计算 1 月 18 日的月度摘要和两个每日摘要,一个是 3 月 5 日,一个是 3 月 27 日

该作业应采用 summary_start_date summary_end_date 并确保仅将 event_dt 介于这两个日期之间的事件包含在计算中.

我当前的代码片段如下所示:

def execute(): Dataframe = {//日志文件val hivecntxt = SparkContextLoader.hiveContextval eventsourceTable= cusotmermetricConstants.source_table//日历信息val abc = Calendar.getInstanceabc.add(Calendar.month, -1)var 月份 = abc.get(Calendar.MONTH)var year = abc.get(Calendar.YEAR)var fileMonth = 月 + 1var monthStr = if (fileMonth<=9) {monthStr ="0" + fileMonth.toString} 别的 {monthStr = fileMonth.toString}//测试目的monthStr = "11"年 = 2016valmonthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")val uniquedf =monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))valmonthlyEventfinal =monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)每月返回Eventfinal}

我们可以在哪里编辑当前模块中的需求寻求建议

解决方案

您可以使用 filter 功能选择范围内的记录,如下所示

//输入df+---+-----------+-----------+|cd|开始日期|结束日期|+---+-----------+-----------+|男|2018-01-01|2018-01-31||D|2018-05-03|2018-05-03||D|2018-03-27|2018-03-27|+---+-----------+-----------+//参数startDate和endDateval endDate="2018-05-03"val endDate="2018-05-03"//过滤条件df.filter(s"start_date>='$startDate' and end_date<='$endDate'").show//样本输出:+---+-----------+-----------+|cd|开始日期|结束日期|+---+-----------+-----------+|D|2018-05-03|2018-05-03||D|2018-03-27|2018-03-27|+---+-----------+-----------+

我希望这对您有所帮助,如果您想对过滤后的记录进行任何计算,则必须将列传递给 udf

Currently we are reading date using calendar instance for picking last one month record using sparksql. Now we need: In case of extra events being added to previous day we must also be able to manually insert summary start and end dates, in case we need manual re run of job for a previous time period: e.g: a manual re run table could look like this:

rprtng_period_type_cd  summary_start_date summary_end_date  summary_iv
M                         2018-01-01      2018-01-31        2018-01
D                         2018-03-05      2018-03-05        2018-03-05
D                         2018-03-27      2018-03-27        2018-03-27

This should tell the job to calculate a monthly summary for Jan18 and two daily summaries, one for 05 march and one for 27 march

The job should take summary_start_date summary_end_date and ensure that only events with an event_dt between those two dates are only included in calculations.

My current code snippet looks like:

def execute(): Dataframe = {
  //log files
  val hivecntxt = SparkContextLoader.hiveContext
  val eventsourceTable= cusotmermetricConstants.source_table

  // Calendar information
  val abc = Calendar.getInstance
  abc.add(Calendar.month, -1)
  var month = abc.get(Calendar.MONTH)
  var year = abc.get(Calendar.YEAR)
  var fileMonth = month + 1
  var monthStr = if (fileMonth<=9) {
    monthStr ="0" + fileMonth.toString
  } else {
    monthStr = fileMonth.toString
  }

  //testing purpose
  monthStr = "11"
  year = 2016
  val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
  val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
  val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)

  return monthlyEventfinal
}

Where can we edit our requirement in current module Looking for suggestions

解决方案

You can use filter function to select records in range like below

//Input df

+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  M|2018-01-01|2018-01-31|
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

//Parameter startDate and endDate
val endDate="2018-05-03"

val endDate="2018-05-03"

//Filter condition
df.filter(s"start_date>='$startDate' and end_date<='$endDate'").show

//Sample Output: 
+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

I hope this will help you, If you want to do any calculation on filtered records then you have to pass columns to udf

这篇关于如何验证历史数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆