Spark:单元测试 - 我有一个函数可以合并 3 个输入数据集.我应该对它们进行单元测试吗? [英] Spark: Unit Test - I have one function that unions 3 input datasets. Should I do Unit test on them?

查看:19
本文介绍了Spark:单元测试 - 我有一个函数可以合并 3 个输入数据集.我应该对它们进行单元测试吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一段代码如下

Object Cal{
def mergedatasets(df: Dataset[Row], df1: Dataset[Row],df2: Dataset[Row]):Dataset[Row]={
 df.union(df1).union(df2)
//other logic

}

}
object readDataframes{
def readFirstDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
def readSecondDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
def readThirdDF(spark:SparkSession):Dataset[Row]={
 spark.read.json(somefile)
}
}

在上面的代码中,我正在读取 3 个文件,然后将它们合并为一个文件,以便进一步处理.基于上述情况,我的问题如下:

In the above code I am reading 3 files and then merging them into one which I use further for processing. Based on the above scenario my questions are as follows:

  1. 对函数 mergdatasets 进行单元测试有意义吗?如果是,要测试的基本/最少事项是什么?如何检查极端情况(如果有)?
  2. 对 readDataframes 进行单元测试是否有意义?如果是,要测试什么?是否要检查推断的架构是否符合预期?还有什么?

我也想将以上问题扩展到以下功能

I would like to extend the above questions for the following functions too

def timeIntervalAgg(df: Dataset[Row]): Dataset[Row] = {

    val timeInterval = df
      .groupBy("id","page_number")
      .agg(sum("timeInterval").alias("timeInterval"))
    timeIntervalAgg

  }

  def timeInterval(df: Dataset[Row]): Dataset[Row] ={

    val windowSpec = Window.partitionBy("id").orderBy("date_time")
    val timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
    val endTime = lead(col("date_time"),1).over(windowSpec)
    val startTime = col("date_time")
    val timeDiff = (unix_timestamp(endTime, timeFmt)
      - unix_timestamp(startTime, timeFmt))
    val timeInterval = df
      .withColumn("timeInterval", lit(when(col("event") === "this_event",lit(null)
        .cast("long"))
        .otherwise(timeDiff)))
      .where("""event != "this_event" """)
    timeInterval

  }

  def addOddpages(df: Dataset[Row]) :Dataset[Row] = {

    val odd = df
      .where("""view_mode = "twin" """)
      .withColumn("page_odd", col("page") + 1)
      .drop("page")
      .select(col("id"), col("date_time")
        .cast("timestamp"),col("page_odd")
        .alias("page"), col("page_view_mode"),
        col("event"),col("timeInterval"))
    val timeIntervalWithoddPage = df.union(odd)
    timeIntervalWithoddPage

  }

  • 请建议是否需要以更好的方式重构代码以实现更好的测试.

    • Please suggest if it is needed to refactor the code in a better way to enable better testing.

      我的目标是了解要测试什么?需要注意什么为上面的代码编写测试?所有这些问题都是针对 Spark 的代码单元测试而不是其他语言代码测试.

      My goal is to understand what to test for? what to look out while writing test for code like above? All this questions are for Spark code Unit testing not other language code testing.

      推荐答案

      读取 JSON 文件:如果您只是读取 JSON 文件,则无需对此进行测试.此外,最好在 schema() 中读取具有显式架构的文件,以避免推断架构出现一些问题.此外,您不需要 3 种相同的方法来读取文件.

      Read JSON files: If you just read JSON files, you don't need to test this. In addition, it might be better to read the files with explicit schema in schema() to avoid some issues with inferred schema. Also, you do not need 3 identical methods for reading the files.

      联合数据集:从 Spark 2.3.0 开始,就有了 unionByName() 函数.该函数按名称(而不是按位置)解析列.当您的 DataFrame 具有不同的列顺序时,您可以考虑使用函数来避免联合问题.当然,这个功能不需要测试.mergedatasets() 方法中的 //other logic 代码很难说.

      Union Datasets: Since Spark 2.3.0 there is unionByName() function. That function resolves columns by name (not by position). You can consider the functions to avoid issues with union when your DataFrames have different order of columns. Of course, this function doesn't need to be tested. It's hard to say about the //other logic code inside the mergedatasets() method.

      对于单元测试,您可以使用 ScalaTest 或其他工具.

      For unit testing, you can use ScalaTest or other tools.

      • master("local")创建SparkSession;
      • 用预期的数据创建一个DataFrame;
      • 为每个要测试的方法创建一个输入 DataFrame.;
      • 比较预期和实际数据帧;
      • Create SparkSession with master("local");
      • Create a DataFrame with the expected data;
      • Create an input DataFrame for each method you want to test.;
      • Compare expected and actual DataFrames;

      以下项目可能有用.您可以在那里找到如何比较两个 DataFrame.此外,README 中有几个示例:https://github.com/MrPowers/spark-快速测试

      The following project might be useful. You can find there how to compare two DataFrames. Also, there are several examples in the README: https://github.com/MrPowers/spark-fast-tests

      这篇关于Spark:单元测试 - 我有一个函数可以合并 3 个输入数据集.我应该对它们进行单元测试吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆