如何为foreachBatch的batchId设置起点? [英] How to setup a starting point for the batchId of foreachBatch?

查看:191
本文介绍了如何为foreachBatch的batchId设置起点?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我面临的问题是我的过程依赖于foreachBatch的batchId作为某种控制,以准备流水线第二阶段的准备工作.因此,只有在第一阶段(批次)完成后,才能进入第二阶段.

The problem that I am facing is that my process relies on the batchId of the foreachBatch as some sort of control of what is ready to the second stage of the pipeline. So it wil only go to the second stage if the first stage (batch) is completed.

我想保证在出现问题的情况下,流可以从停止处继续.

I want to guarantee that in case of something goes wrong, the stream can continue from where it stopped.

我们试图通过将所有完成的批次添加到增量表中来进行一些控制,但是,我找不到设置初始batchId的方法.

We tried to do some control by adding all completed batchs to a delta table, however, I couldn't find a way to set the initial batchId.

推荐答案

我想保证在出现问题的情况下,流可以从停止处继续.

I want to guarantee that in case of something goes wrong, the stream can continue from where it stopped.

这是foreachBatch接收器的checkpointLocation选项,在出现问题时用作预写日志(WAL).

That's the checkpointLocation option of the foreachBatch sink that is used as a write-ahead log (WAL) in case of problems.

引用官方文档:

最后,系统通过检查点和预写日志来确保端到端的一次容错保证.

Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs.

然后在通过检查点从故障中恢复:

万一发生故障或有意关闭时,您可以恢复上一个查询的先前进度和状态,并在中断的地方继续进行.这是通过使用检查点日志和预写日志来完成的.您可以使用查询点位置配置查询,查询会将所有进度信息(即每个触发器处理的偏移量范围)和正在运行的聚合(例如快速示例中的字数)保存到检查点位置.此检查点位置必须是与HDFS兼容的文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项.

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.

我认为这完全涵盖了您的用例.

I think that covers your use case exactly.

我找不到设置初始batchId的方法.

I couldn't find a way to set the initial batchId.

这需要在流查询的checkpointLocation选项中使用具有预期批次ID的预填充目录.

That'd require to use a pre-populated directory with the expected batch ID in the checkpointLocation option of a streaming query.

您可以轻松地自己创建必要的文件,然后让恢复的流查询从目录开始.

You could simply create the necessary files yourself and let resumed streaming queries start from the directory.

(我以前从未尝试过,但看起来可行).

(I've never tried it out myself before, but looks doable).

这篇关于如何为foreachBatch的batchId设置起点?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆