模拟SparkSession进行单元测试 [英] Mocking SparkSession for unit testing

查看:98
本文介绍了模拟SparkSession进行单元测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的spark应用程序中有一个方法可以从MySQL数据库加载数据.该方法看起来像这样.

I have a method in my spark application that loads the data from a MySQL database. the method looks something like this.

trait DataManager {

val session: SparkSession

def loadFromDatabase(input: Input): DataFrame = {
            session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
              input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
    }
}

除了执行 jdbc 方法并从数据库加载数据之外,该方法仅做其他事情.如何测试此方法?标准方法是创建对象 session 的模拟对象,该对象是 SparkSession 的实例.但是由于 SparkSession 具有私有构造函数,所以我无法使用ScalaMock对其进行模拟.

The method does nothing else other than executing jdbc method and loads data from the database. How can I test this method? The standard approach is to create a mock of the object session which is an instance of SparkSession. But since SparkSession has a private constructor I was not able to mock it using ScalaMock.

这里的主要问题是我的函数是一个纯粹的副作用函数(副作用是从关系数据库中提取数据),并且鉴于我在模拟 SparkSession时遇到问题,我如何对该单元进行单元测试

The main ask here is that my function is a pure side-effecting function (the side-effect being pull data from relational database) and how can i unit test this function given that I have issues mocking SparkSession.

那么我有什么方法可以模拟 SparkSession 或比模拟测试此方法更好的其他方法?

So is there any way I can mock SparkSession or any other better way than mocking to test this method?

推荐答案

在您的情况下,我建议不要模拟SparkSession.这或多或少会模拟整个功能(无论如何您都可以这样做).如果要测试此功能,我的建议是运行一个嵌入式数据库(例如 H2 )并使用一个真正的SparkSession.为此,您需要将SparkSession提供给您的 DataManager .

In your case I would recommend not to mock the SparkSession. This would more or less mock the entire function (which you could do anyways). If you want to test this function my suggestion would be to run an embeded database (like H2) and use a real SparkSession. To do this you need to provide the SparkSession to your DataManager.

未经测试的草图:

您的代码:

class DataManager (session: SparkSession) {
         def loadFromDatabase(input: Input): DataFrame = {
            session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
            input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
         }
    }

您的测试用例:

class DataManagerTest extends FunSuite with BeforeAndAfter {
  override def beforeAll() {
    Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", "");
    // your insert statements goes here
    conn.close()
  }

  test ("should load data from database") {
    val dm = DataManager(SparkSession.builder().getOrCreate())
    val input = Input(jdbcUrl = "jdbc:h2:~/test", selectQuery="SELECT whateveryounedd FROM whereeveryouputit ")
    val expectedData = dm.loadFromDatabase(input)
    assert(//expectedData)
  }
}

这篇关于模拟SparkSession进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆