如何验证历史数据? [英] How to validate history data?

查看:154
本文介绍了如何验证历史数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当前,我们正在使用日历实例读取日期,以便使用sparksql来选择最近一个月的记录.现在,我们需要:如果在前一天添加了额外的事件,那么我们还必须能够手动插入摘要开始和结束日期,以防我们需要在先前时间段内手动重新执行工作: 例如:手动重新运行表可能如下所示:

Currently we are reading date using calendar instance for picking last one month record using sparksql. Now we need: In case of extra events being added to previous day we must also be able to manually insert summary start and end dates, in case we need manual re run of job for a previous time period: e.g: a manual re run table could look like this:

rprtng_period_type_cd  summary_start_date summary_end_date  summary_iv
M                         2018-01-01      2018-01-31        2018-01
D                         2018-03-05      2018-03-05        2018-03-05
D                         2018-03-27      2018-03-27        2018-03-27

这应该告诉工作以计算1月18日的月度摘要和两份每日摘要,一份为3月5日,一份为3月27日

This should tell the job to calculate a monthly summary for Jan18 and two daily summaries, one for 05 march and one for 27 march

该作业应采用summary_start_date summary_end_date,并确保仅在这两个日期之间具有event_dt的事件才包括在计算中.

The job should take summary_start_date summary_end_date and ensure that only events with an event_dt between those two dates are only included in calculations.

我当前的代码段如下:

def execute(): Dataframe = {
  //log files
  val hivecntxt = SparkContextLoader.hiveContext
  val eventsourceTable= cusotmermetricConstants.source_table

  // Calendar information
  val abc = Calendar.getInstance
  abc.add(Calendar.month, -1)
  var month = abc.get(Calendar.MONTH)
  var year = abc.get(Calendar.YEAR)
  var fileMonth = month + 1
  var monthStr = if (fileMonth<=9) {
    monthStr ="0" + fileMonth.toString
  } else {
    monthStr = fileMonth.toString
  }

  //testing purpose
  monthStr = "11"
  year = 2016
  val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
  val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
  val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)

  return monthlyEventfinal
}

我们在哪里可以编辑当前模块中的要求 寻找建议

Where can we edit our requirement in current module Looking for suggestions

推荐答案

您可以使用filter函数选择以下范围内的记录

You can use filter function to select records in range like below

//Input df

+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  M|2018-01-01|2018-01-31|
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

//Parameter startDate and endDate
val endDate="2018-05-03"

val endDate="2018-05-03"

//Filter condition
df.filter(s"start_date>='$startDate' and end_date<='$endDate'").show

//Sample Output: 
+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

我希望这会对您有所帮助,如果您想对过滤后的记录进行任何计算,则必须将列传递给udf

I hope this will help you, If you want to do any calculation on filtered records then you have to pass columns to udf

这篇关于如何验证历史数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆