如何使用 Kafka Stream 手动提交? [英] How to commit manually with Kafka Stream?

查看:72
本文介绍了如何使用 Kafka Stream 手动提交?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法用 Kafka Stream 手动提交?

Is there a way to commit manually with Kafka Stream?

通常使用 KafkaConsumer,我会执行以下操作:

Usually with using the KafkaConsumer, I do something like below:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

我手动调用提交的地方.我没有看到 KStream 有类似的 API.

Where I'm calling commit manually. I don't see a similar API for KStream.

推荐答案

提交由 Streams 在内部处理且全自动处理,因此通常没有理由手动提交.请注意,Streams 处理此问题的方式与消费者自动提交不同——事实上,内部使用的消费者禁用了自动提交,而 Streams 则手动"管理提交.原因是,提交只能在处理过程中的某些点发生,以确保不会丢失任何数据(在更新状态和刷新结果方面存在许多内部依赖性).

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).

对于更频繁的提交,您可以通过 StreamsConfig 参数 commit.interval.ms 减少提交间隔.

For more frequent commits, you can reduce commit interval via StreamsConfig parameter commit.interval.ms.

尽管如此,手动提交还是可以通过低级处理器 API 间接进行的.您可以使用通过 init() 方法提供的 context 对象来调用 context#commit().请注意,这只是尽快提交的对 Streams 的请求"——它不是直接发出提交.

Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context object that is provided via init() method to call context#commit(). Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.

这篇关于如何使用 Kafka Stream 手动提交?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆