成功批量插入后更新Kafka提交偏移量 [英] Update Kafka commit offset after successful batch insert
问题描述
我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存.定时任务会定期清除缓存中的记录.我只想在批处理已成功保存到数据库中后更新 COMMIT OFFSET.我尝试将确认对象传递给缓存服务以调用确认方法,如下所示.
public class KafkaConsumer {@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )公共无效接收(消费者记录<字符串,字符串>记录,确认确认){cacheService.add( record.value(), 确认);}}公共类缓存服务{//并发处理已被排除在外以提高可读性公共无效添加(字符串记录,确认确认){this.records.add(record);this.lastAcknowledgement = 确认;}public void saveBatch() {//被定时任务调用如果(记录.大小()== BATCH_SIZE){//批量插入数据库this.lastAcknowledgement.acknowledge();this.records.clear();}}}
AckMode 设置如下:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
并且自动提交是错误的:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
即使调用了确认方法,提交偏移量也不会更新.保存记录后更新提交偏移的最佳方法是什么?
我使用的是 spring-kafka 2.1.7.RELEASE.
<小时>编辑:@GaryRussell 确认在下一次轮询期间由消费者线程执行外部线程所做的确认,我重新检查了我的代码并发现了如何设置最后一个确认对象的错误.修复此问题后,提交偏移量按预期更新.所以这个问题已经解决了.但是,我无法将这个问题标记为已回答.
这里有问题,Consumer 线程负责提交偏移量.在 poll 消费者线程会提交之前的批量偏移量.
因为在您的情况下 AUTO_COMMIT
是假的并且 lastAcknowledgment.acknowledge()
不确认偏移量未提交.
只有一种方法可以做到这一点,一旦您获得轮询记录,将 Schedule
任务设为 Async
并保持消费者线程并在 Async 完成后提交偏移量任务,检查此答案以供参考答案>
注意如果您持有消费者线程超过 5 分钟,将发生重新平衡这里
<块引用>新的 Java Consumer 现在支持来自后台线程的心跳.有一个新的配置 max.poll.interval.ms 用于控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟).配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms
因为这是在消费者重新平衡时JoinGroup 请求可以在服务器上阻塞的最长时间,所以我们已将其默认值更改为略高于 5 分钟.最后将session.timeout.ms
的默认值调整为10秒,将max.poll.records
的默认值改为500.
特别说明来自 spring kafka >2.1.5
在下一次轮询之前,消费者线程将执行对外部线程的确认感谢@Gary Russell 提供此信息
I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
The AckMode has been set as follows:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
And the Auto Commit is false:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?
I'm using spring-kafka 2.1.7.RELEASE.
EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered.
Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.
Since in your case AUTO_COMMIT
is false and lastAcknowledgment.acknowledge()
is not acknowledge the offset is not submit.
Only one way to do this, As soon as you get the poll records make Schedule
task as Async
and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answer
Note If you hold consumer thread more than 5 minutes rebalance will takes place here
he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than
max.poll.interval.ms
because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value ofsession.timeout.ms
has been adjusted down to 10 seconds, and the default value ofmax.poll.records
has been changed to 500.
Special Note from spring kafka >2.1.5
Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information
这篇关于成功批量插入后更新Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!