成功批量插入后更新Kafka提交偏移量 [英] Update Kafka commit offset after successful batch insert

查看:90
本文介绍了成功批量插入后更新Kafka提交偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spring-kafka使用者,它读取记录并将它们移交给缓存.计划任务将定期清除缓存中的记录.我只想在将批处理成功保存到数据库后才更新COMMIT OFFSET.我尝试将确认对象传递给缓存服务,以调用确认方法,如下所示.

 公共类KafkaConsumer {@KafkaListener(topicPattern ="$ {kafka.topicpattern}",containerFactory ="kafkaListenerContainerFactory")公共无效的接收(ConsumerRecord< String,String>记录,确认确认){cacheService.add(record.value(),确认);}}公共类CacheService {//并发处理已被忽略,以提高可读性public void add(字符串记录,确认){this.records.add(record);this.lastAcknowledgment =确认;}public void saveBatch(){//由计划任务调用if(records.size()== BATCH_SIZE){//执行批量插入数据库this.lastAcknowledgment.acknowledge();this.records.clear();}}} 

AckMode设置如下:

  factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 

自动提交为假:

  config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); 

即使调用了确认方法,提交偏移也不会更新.保留记录后更新提交偏移量的最佳方法是什么?

我正在使用spring-kafka 2.1.7.RELEASE.


编辑:@GaryRussell之后确认,由外线程进行的确认是在下一次轮询期间由使用者线程执行的,我重新检查了代码,发现了如何设置最后一个确认对象的错误.修复此问题后,提交偏移量已按预期更新.因此,此问题已解决.但是,我无法将这个问题标记为已回答.

解决方案

这是问题所在,使用者线程负责提交偏移量.在轮询时,消费者线程将提交先前的批次偏移量.

因为在您的情况下, AUTO_COMMIT 为false,并且 lastAcknowledgment.acknowledge()未确认偏移量未提交.

执行此操作的唯一方法是,一旦获得轮询记录,就将 Schedule 任务设置为 Async ,并保留使用者线程并在Async完成后提交偏移量任务,请检查此答案以供参考 answer

注意:如果您持有使用方线程超过5分钟,则会重新平衡这里

新的Java Consumer现在支持从后台线程进行心跳.有一个新的配置max.poll.interval.ms,它控制使用者调用前主动离开组(默认为5分钟)之前,两次轮询调用之间的最长时间.配置request.timeout.ms的值必须始终大于 max.poll.interval.ms ,因为这是使用者重新平衡时JoinGroup请求可以在服务器上阻止的最长时间,因此我们将其默认值更改为刚好超过5分钟.最后, session.timeout.ms 的默认值已调低至10秒,而 max.poll.records 的默认值已更改为500./p>

特别说明,来自春季卡夫卡> 2.1.5

即将在下一次民意调查之前由消费者线程执行对外线程的确认

感谢@Gary Russell的信息

I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

The AckMode has been set as follows:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

And the Auto Commit is false:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?

I'm using spring-kafka 2.1.7.RELEASE.


EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered.

解决方案

Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.

Since in your case AUTO_COMMIT is false and lastAcknowledgment.acknowledge() is not acknowledge the offset is not submit.

Only one way to do this, As soon as you get the poll records make Schedule task as Async and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answer

Note If you hold consumer thread more than 5 minutes rebalance will takes place here

he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

Special Note from spring kafka >2.1.5

Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information

这篇关于成功批量插入后更新Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆