如何在Spark结构化流上执行单元测试? [英] How to perform Unit testing on Spark Structured Streaming?
问题描述
我想了解Spark结构化流的单元测试方面.我的情况是,我要从Kafka获取数据,然后使用Spark结构化流对其进行消费,并在数据之上进行一些转换.
I would like to know about the unit testing side of Spark Structured Streaming. My scenario is, I am getting data from Kafka and I am consuming it using Spark Structured Streaming and applying some transformations on top of the data.
我不确定如何使用Scala和Spark进行测试.有人可以告诉我如何使用Scala在结构化流中进行单元测试.我是流媒体新手.
I am not sure about how can I test this using Scala and Spark. Can someone tell me how to do unit testing in Structured Streaming using Scala. I am new to streaming.
推荐答案
tl; dr 使用 MemoryStream
为输出添加事件和内存接收器.
tl;dr Use MemoryStream
to add events and memory sink for the output.
以下代码应有助于入门:
The following code should help to get started:
import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")
// use sessions event stream to apply required transformations
val transformedSessions = ...
val streamingQuery = transformedSessions
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.outputMode(queryOutputMode)
.start
// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
eventGen.generate(userId = 1, offset = 1.second),
eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])
// check the output
// The output is in queryName table
// The following code simply shows the result
spark
.table(queryName)
.show(truncate = false)
这篇关于如何在Spark结构化流上执行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!