如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录? [英] How to gracefully shut down reactive kafka-consumer and commit last processed record?
本文介绍了如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我苦苦寻找此功能的过程在令人作呕的日志问题Several last offsets aren't getting commited with reactive kafka中得到了充分的描述,它显示了我多次尝试不同的失败。
如何订阅ReactiveKafkaConsumerTemplate<String, String>
,它将以同步方式处理记录(为简单起见),并将每2秒确认/提交一次,并在手动取消流时?即。它起作用,每2秒确认/提交一次。然后,通过REST/jmx/Anywhere信号,流终止并确认/提交最后处理的Kafka记录。
推荐答案
经过多次尝试后,我能够想出以下解决方案。它看起来很管用,但有点难看,因为它是非常简单的,外部流高度依赖于其他方法内部发生的事情。请批评并提出改进建议。谢谢。
kafkaReceiver.receive()
.flatMapSequential(receivedKafkaRecord -> processKafkaRecord(receivedKafkaRecord), 16)
.takeWhile(e-> !stopped)
.sample(configuration.getKafkaConfiguration().getCommitInterval())
.concatMap(offset -> {
log.debug("ack/commit offset {}", offset.offset());
offset.acknowledge();
return offset.commit();
})
.doOnTerminate(()-> log.info("stopped."));
什么不起作用:
A)您不能使用Disposable.Dispose,因为这会中断流,并且不会提交您最近处理的记录。
B)您不能将Take放在流的顶部,因为这会取消流,并且您也将无法提交。
C)不确定我如何才能在这里跨公司使用错误。
由于不起作用流终止由名为stopped
的布尔字段触发,可任意设置。
流程说明:
- Flat MapSequential-由于内部并行性,并且只有在处理完所有N-1个时才需要提交N。
- process KafkaRecord返回
Mono<ReceiverOffset>
,即。要确认/提交的已处理记录的偏移量。当stopped
时,该方法将跳过处理并返回Mono.Empty take
如果停止将停止流,这必须放在此处,因为整个sample
间隔可能只由&q;空&q; 组成
- REST很简单:按给定间隔采样,按顺序提交。如果Sample返回空记录,则跳过提交。最后,我们记录该流被取消。
如果有人知道如何改进,请批评。
这篇关于如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文