结构化流如何确保文件接收器的一次写入语义正确? [英] How does Structured Streaming ensure exactly-once writing semantics for file sinks?

查看:104
本文介绍了结构化流如何确保文件接收器的一次写入语义正确?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写用于Spark结构化流的存储写入器,它将对给定的数据帧进行分区并写入另一个Blob存储帐户. Spark文档说,它可以确保文件接收器的exactly once语义,但还说,只有在源可重播且接收器是幂等的情况下,才可能有一次精确的语义.

I am writing a storage writer for spark structured streaming which will partition the given dataframe and write to a different blob store account. The spark documentation says the it ensures exactly once semantics for file sinks but also says that the exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

  1. 如果以镶木地板格式编写,blob是否存储幂等接收器?

  1. Is the blob store an idempotent sink if I write in parquet format?

如果我正在执行streamingDF.writestream.foreachbatch(...writing the DF here...).start(),行为也会如何改变?仍然可以保证一次语义吗?

Also how will the behavior change if I am doing streamingDF.writestream.foreachbatch(...writing the DF here...).start()? Will it still guarantee exactly once semantics?

可能重复: Update#1:类似-

Update#1 : Something like -

output
      .writeStream
      .foreachBatch((df: DataFrame, _: Long) => {
        path = storagePaths(r.nextInt(3))

        df.persist()
        df.write.parquet(path)
        df.unpersist()
      })

推荐答案

微批流处理

我认为问题是关于微批流处理(不是连续流处理)的.

Micro-Batch Stream Processing

I assume that the question is about Micro-Batch Stream Processing (not Continuous Stream Processing).

基于内部注册表的可用和已提交的偏移量(对于当前流执行,又称为runId)以及常规检查点(在重新启动后保持处理状态),可以保证语义恰好一次.

Exactly once semantics are guaranteed based on available and committed offsets internal registries (for the current stream execution, aka runId) as well as regular checkpoints (to persist processing state across restarts).

只有在源可重播且接收器是幂等的情况下,语义才可能出现一次.

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

有可能重新处理已经处理但内部记录不正确的任何内容(见下文):

It is possible that whatever has already been processed but not recorded properly internally (see below) can be re-processed:

  • 这意味着流查询中的所有流源都应可重播,以允许轮询曾经请求过的数据.

  • That means that all streaming sources in a streaming query should be re-playable to allow for polling for data that has once been requested.

这也意味着接收器应该是幂等,因此可以再次添加已成功处理并添加到接收器的数据,因为在结构化流设法记录之前就发生了故障成功处理的数据(偏移量)(在检查点)

That also means that the sink should be idempotent so the data that has been processed successfully and added to the sink may be added again because a failure happened just before Structured Streaming managed to record the data (offsets) as successfully processed (in the checkpoint)

在处理任何流源或阅读器的可用数据(按偏移量)之前,MicroBatchExecution将偏移量提交到预写日志(WAL)并将以下INFO消息打印到日志中:

Before the available data (by offset) of any of the streaming source or reader is processed, MicroBatchExecution commits the offsets to Write-Ahead Log (WAL) and prints out the following INFO message to the logs:

批处理[currentBatchId]的承诺偏移量.元数据[offsetSeqMetadata]

Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]

仅当有新数据可用时(基于偏移量),或者上一次执行需要另一个用于状态管理的微批处理时,才执行流查询(微批处理).

A streaming query (a micro-batch) is executed only when there is new data available (based on offsets) or the last execution requires another micro-batch for state management.

addBatch 阶段,MicroBatchExecution请求一个且唯一的SinkStreamWriteSupport来处理可用数据.

In addBatch phase, MicroBatchExecution requests the one and only Sink or StreamWriteSupport to process the available data.

微型批处理成功完成后,MicroBatchExecution将可用偏移量提交给提交检查点,并且该偏移量被视为已处理.

Once a micro-batch finishes successfully the MicroBatchExecution commits the available offsets to commits checkpoint and the offsets are considered processed already.

MicroBatchExecution将以下调试消息打印到日志中:

MicroBatchExecution prints out the following DEBUG message to the logs:

已完成的批次[currentBatchId]

Completed batch [currentBatchId]

这篇关于结构化流如何确保文件接收器的一次写入语义正确?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆