如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录? [英] How to gracefully shut down reactive kafka-consumer and commit last processed record?

查看:0
本文介绍了如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我苦苦寻找此功能的过程在令人作呕的日志问题Several last offsets aren't getting commited with reactive kafka中得到了充分的描述,它显示了我多次尝试不同的失败。

如何订阅ReactiveKafkaConsumerTemplate<String, String>,它将以同步方式处理记录(为简单起见),并将每2秒确认/提交一次,并在手动取消流时?即。它起作用,每2秒确认/提交一次。然后,通过REST/jmx/Anywhere信号,流终止并确认/提交最后处理的Kafka记录。

推荐答案

经过多次尝试后,我能够想出以下解决方案。它看起来很管用,但有点难看,因为它是非常简单的,外部流高度依赖于其他方法内部发生的事情。请批评并提出改进建议。谢谢。

kafkaReceiver.receive()
    .flatMapSequential(receivedKafkaRecord -> processKafkaRecord(receivedKafkaRecord), 16)
    .takeWhile(e-> !stopped)
    .sample(configuration.getKafkaConfiguration().getCommitInterval())
    .concatMap(offset -> {
        log.debug("ack/commit offset {}", offset.offset());
        offset.acknowledge();
        return offset.commit();
    })
    .doOnTerminate(()-> log.info("stopped."));

什么不起作用:

A)您不能使用Disposable.Dispose,因为这会中断流,并且不会提交您最近处理的记录。

B)您不能将Take放在流的顶部,因为这会取消流,并且您也将无法提交。

C)不确定我如何才能在这里跨公司使用错误。

由于不起作用流终止由名为stopped的布尔字段触发,可任意设置。

流程说明:

  1. Flat MapSequential-由于内部并行性,并且只有在处理完所有N-1个时才需要提交N。
  2. process KafkaRecord返回Mono<ReceiverOffset>,即。要确认/提交的已处理记录的偏移量。当stopped时,该方法将跳过处理并返回Mono.Empty
  3. take如果停止将停止流,这必须放在此处,因为整个sample间隔可能只由&q;空&q;
  4. 组成
  5. REST很简单:按给定间隔采样,按顺序提交。如果Sample返回空记录,则跳过提交。最后,我们记录该流被取消。

如果有人知道如何改进,请批评。

这篇关于如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆