成功批量插入后更新Kafka提交偏移量 [英] Update Kafka commit offset after successful batch insert

查看:39
本文介绍了成功批量插入后更新Kafka提交偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存.定时任务会定期清除缓存中的记录.我只想在批处理已成功保存到数据库中后更新 COMMIT OFFSET.我尝试将确认对象传递给缓存服务以调用确认方法,如下所示.

public class KafkaConsumer {@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )公共无效接收(消费者记录<字符串,字符串>记录,确认确认){cacheService.add( record.value(), 确认);}}公共类缓存服务{//并发处理已被排除在外以提高可读性公共无效添加(字符串记录,确认确认){this.records.add(record);this.lastAcknowledgement = 确认;}public void saveBatch() {//被定时任务调用如果(记录.大小()== BATCH_SIZE){//批量插入数据库this.lastAcknowledgement.acknowledge();this.records.clear();}}}

AckMode 设置如下:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

并且自动提交是错误的:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

即使调用了确认方法,提交偏移量也不会更新.保存记录后更新提交偏移的最佳方法是什么?

我使用的是 spring-kafka 2.1.7.RELEASE.

<小时>

编辑:@GaryRussell 确认在下一次轮询期间由消费者线程执行外部线程所做的确认,我重新检查了我的代码并发现了如何设置最后一个确认对象的错误.修复此问题后,提交偏移量按预期更新.所以这个问题已经解决了.但是,我无法将这个问题标记为已回答.

解决方案

这里有问题,Consumer 线程负责提交偏移量.在 poll 消费者线程会提交之前的批量偏移量.

因为在您的情况下 AUTO_COMMIT 是假的并且 lastAcknowledgment.acknowledge() 不确认偏移量未提交.

只有一种方法可以做到这一点,一旦您获得轮询记录,将 Schedule 任务设为 Async 并保持消费者线程并在 Async 完成后提交偏移量任务,检查此答案以供参考答案

注意如果您持有消费者线程超过 5 分钟,将发生重新平衡这里

<块引用>

新的 Java Consumer 现在支持来自后台线程的心跳.有一个新的配置 max.poll.interval.ms 用于控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟).配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms 因为这是在消费者重新平衡时JoinGroup 请求可以在服务器上阻塞的最长时间,所以我们已将其默认值更改为略高于 5 分钟.最后将session.timeout.ms的默认值调整为10秒,将max.poll.records的默认值改为500.

特别说明来自 spring kafka >2.1.5

在下一次轮询之前,消费者线程将执行对外部线程的确认感谢@Gary Russell 提供此信息

I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

The AckMode has been set as follows:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

And the Auto Commit is false:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?

I'm using spring-kafka 2.1.7.RELEASE.


EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered.

解决方案

Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.

Since in your case AUTO_COMMIT is false and lastAcknowledgment.acknowledge() is not acknowledge the offset is not submit.

Only one way to do this, As soon as you get the poll records make Schedule task as Async and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answer

Note If you hold consumer thread more than 5 minutes rebalance will takes place here

he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

Special Note from spring kafka >2.1.5

Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information

这篇关于成功批量插入后更新Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆