如何在Spark结构化流上执行单元测试? [英] How to perform Unit testing on Spark Structured Streaming?

查看:181
本文介绍了如何在Spark结构化流上执行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解Spark结构化流的单元测试方面.我的情况是,我要从Kafka获取数据,然后使用Spark结构化流对其进行消费,并在数据之上进行一些转换.

I would like to know about the unit testing side of Spark Structured Streaming. My scenario is, I am getting data from Kafka and I am consuming it using Spark Structured Streaming and applying some transformations on top of the data.

我不确定如何使用Scala和Spark进行测试.有人可以告诉我如何使用Scala在结构化流中进行单元测试.我是流媒体新手.

I am not sure about how can I test this using Scala and Spark. Can someone tell me how to do unit testing in Structured Streaming using Scala. I am new to streaming.

推荐答案

tl; dr 使用 MemoryStream 为输出添加事件和内存接收器.

tl;dr Use MemoryStream to add events and memory sink for the output.

以下代码应有助于入门:

The following code should help to get started:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)

这篇关于如何在Spark结构化流上执行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆