如何使用Kafka Stream手动提交? [英] How to commit manually with Kafka Stream?

查看:544
本文介绍了如何使用Kafka Stream手动提交?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以使用Kafka Stream手动提交?

Is there a way to commit manually with Kafka Stream?

通常使用KafkaConsumer,我会执行以下操作:

Usually with using the KafkaConsumer, I do something like below:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

我在哪里手动调用commit.我没有看到与KStream类似的API.

Where I'm calling commit manually. I don't see a similar API for KStream.

推荐答案

提交由Streams在内部完全自动化地处理,因此通常没有理由手动提交.请注意,Streams的处理方式与使用者自动提交的处理方式不同-实际上,内部使用的使用者使用自动提交功能是禁用的,Streams是手动"管理提交的.原因是,提交只能在处理过程中的某些时候发生,以确保不会丢失任何数据(在更新状态和刷新结果方面存在许多内部依赖性).

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).

对于更频繁的提交,可以通过StreamsConfig参数commit.interval.ms缩短提交间隔.

For more frequent commits, you can reduce commit interval via StreamsConfig parameter commit.interval.ms.

尽管如此,通过低级Processor API可以间接进行手动提交.您可以使用通过init()方法提供的context对象来调用context#commit().请注意,这只是尽快提交的请求流",而不是直接发出提交.

Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context object that is provided via init() method to call context#commit(). Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.

这篇关于如何使用Kafka Stream手动提交?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆