Spark Structured Streaming 检查点在生产中的使用 [英] Spark Structured Streaming checkpoint usage in production

查看:75
本文介绍了Spark Structured Streaming 检查点在生产中的使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用 Spark Structured 流时,我无法理解检查点的工作原理.

我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中.对于这些事件,我会在 kafka 流中收到一个确认事件.

我创建了一个新的火花过程

  • 将 Hive 日志表中的事件读入 DataFrame
  • 使用 Spark Structured Streaming 将这些事件与确认事件流结合
  • 将连接的 DataFrame 写入 HBase 表.

我在 spark-shell 中测试了代码,它在伪代码下方运行良好(我使用的是 Scala).

val tableA = spark.table(tableA")val开始偏移=最早"val streamOfData = .readStream.format(卡夫卡").option(startingOffsets",startingOffsets).option("otherOptions", otherOptions)val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq(a"), inner")joinTableAWithStreamOfData.writeStream.foreach(writeDataToHBaseTable()).开始().awaitTermination()

现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在此处使用检查点.

在每次运行此代码时,我希望仅从流中读取上次运行中尚未读取的事件,并将这些新事件与我的日志表进行内部连接,所以只将新数据写入最终的 HBase 表.

我在 HDFS 中创建了一个目录来存储检查点文件.我将该位置提供给我用来调用火花代码的 spark-submit 命令.

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory--all_the_other_settings_and_libraries

此时代码每 15 分钟运行一次,没有任何错误,但它基本上没有做任何事情,因为它没有将新事件转储到 HBase 表.检查点目录也是空的,而我认为必须在那里写入一些文件?

readStream 函数是否需要调整以便从最新的检查点开始读取?

val streamOfData = .readStream.format(卡夫卡").option(startingOffsets",startingOffsets) ??.option("otherOptions", otherOptions)

我真的很难理解关于这个的 spark 文档.

先谢谢你!

解决方案

触发器

<块引用>

"现在我想安排这段代码定期运行,例如每 15 分钟一次,我正在努力理解如何在此处使用检查点.

如果您希望每 15 分钟触发一次作业,您可以使用 触发器.

您不需要使用"专门检查点,但只提供一个可靠的(例如 HDFS)检查点位置,见下文.

检查点

<块引用>

在每次运行此代码时,我只想从流中读取上次运行中尚未读取的事件 [...]"

在 Spark Structured Streaming 应用程序中从 Kafka 读取数据时,最好直接在 StreamingQuery 中设置检查点位置.Spark 使用此位置创建检查点文件,以跟踪应用程序的状态并记录已从 Kafka 读取的偏移量.

当重新启动应用程序时,它会检查这些检查点文件以了解从哪里继续从 Kafka 读取,这样它就不会跳过或错过任何消息.您不需要手动设置startingOffset.

请务必记住,仅允许对应用程序代码进行特定更改,以便检查点文件可用于安全重启.可以在 流式查询更改后的恢复语义.


总的来说,对于从 Kafka 读取数据的高效 Spark Structured Streaming 应用程序,我推荐以下结构:

val spark = SparkSession.builder().[...].getOrCreate()val streamOfData = spark.readStream.format(卡夫卡")//选项startingOffsets 仅与此应用程序第一次运行相关.之后,正在使用检查点文件..option(startingOffsets",startingOffsets).option("otherOptions", otherOptions).加载()//对流式数据帧执行任何类型的转换val 处理的StreamOfData = streamOfData.[...]val 流查询 = 处理的StreamOfData.writeStream.foreach(writeDataToHBaseTable()).option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/";.trigger(Trigger.ProcessingTime(15 分钟")).开始()streamQuery.awaitTermination()

I have troubles understanding how checkpoints work when working with Spark Structured streaming.

I have a spark process that generates some events, which I log in an Hive table. For those events, I receive a confirmation event in a kafka stream.

I created a new spark process that

  • reads the events from the Hive log table into a DataFrame
  • joins those events with the stream of confirmation events using Spark Structured Streaming
  • writes the joined DataFrame to an HBase table.

I tested the code in spark-shell and it works fine, below the pseudocode (I'm using Scala).

val tableA = spark.table("tableA")

val startingOffset = "earliest"

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets)
  .option("otherOptions", otherOptions)

val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")

joinTableAWithStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  ).start()
  .awaitTermination()

Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.

At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run, and inner join those new events with my log table, so to write only new data to the final HBase table.

I created a directory in HDFS where to store the checkpoint file. I provided that location to the spark-submit command I use to call the spark code.

spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory 
--all_the_other_settings_and_libraries

At this moment the code runs fine every 15 minutes without any error, but it doesn't do anything basically since it is not dumping the new events to the HBase table. Also the checkpoint directory is empty, while I assume some file has to be written there?

And does the readStream function need to be adapted so to start reading from the latest checkpoint?

val streamOfData = .readStream 
  .format("kafka") 
  .option("startingOffsets", startingOffsets) ??
  .option("otherOptions", otherOptions)

I'm really struggling to understand the spark documentation regarding this.

Thank you in advance!

解决方案

Trigger

"Now I would like to schedule this code to run periodically, e.g. every 15 minutes, and I'm struggling understanding how to use checkpoints here.

In case you want your job to be triggered every 15 minutes, you can make use of Triggers.

You do not need to "use" checkpointing specifically, but just provide a reliable (e.g. HDFS) checkpoint location, see below.

Checkpointing

At every run of this code, I would like to read from the stream only the events I haven't read yet in the previous run [...]"

When reading data from Kafka in a Spark Structured Streaming application it is best to have the checkpoint location set directly in your StreamingQuery. Spark uses this location to create checkpoint files that keep track of your application's state and also record the offsets already read from Kafka.

When restarting the application it will check these checkpoint files to understand from where to continue to read from Kafka so it does not skip or miss any message. You do not need to set the startingOffset manually.

It is important to keep in mind that only specific changes in your application's code are allowed such that the checkpoint files can be used for secure re-starts. A good overview can be found in the Structured Streaming Programming Guide on Recovery Semantics after Changes in a Streaming Query.


Overall, for productive Spark Structured Streaming applications reading from Kafka I recommend the following structure:

val spark = SparkSession.builder().[...].getOrCreate()

val streamOfData = spark.readStream 
  .format("kafka") 
// option startingOffsets is only relevant for the very first time this application is running. After that, checkpoint files are being used.
  .option("startingOffsets", startingOffsets) 
  .option("otherOptions", otherOptions)
  .load()

// perform any kind of transformations on streaming DataFrames
val processedStreamOfData = streamOfData.[...]


val streamingQuery = processedStreamOfData 
  .writeStream
  .foreach(
    writeDataToHBaseTable()
  )
  .option("checkpointLocation", "/path/to/checkpoint/dir/in/hdfs/"
  .trigger(Trigger.ProcessingTime("15 minutes"))
  .start()

streamingQuery.awaitTermination()

这篇关于Spark Structured Streaming 检查点在生产中的使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆