成功批量插入后更新Kafka提交偏移量 [英] Update Kafka commit offset after successful batch insert
问题描述
我有一个spring-kafka使用者,它读取记录并将它们移交给缓存.计划任务将定期清除缓存中的记录.我只想在将批处理成功保存到数据库后才更新COMMIT OFFSET.我尝试将确认对象传递给缓存服务,以调用确认方法,如下所示.
公共类KafkaConsumer {@KafkaListener(topicPattern ="$ {kafka.topicpattern}",containerFactory ="kafkaListenerContainerFactory")公共无效的接收(ConsumerRecord< String,String>记录,确认确认){cacheService.add(record.value(),确认);}}公共类CacheService {//并发处理已被忽略,以提高可读性public void add(字符串记录,确认){this.records.add(record);this.lastAcknowledgment =确认;}public void saveBatch(){//由计划任务调用if(records.size()== BATCH_SIZE){//执行批量插入数据库this.lastAcknowledgment.acknowledge();this.records.clear();}}}
AckMode设置如下:
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
自动提交为假:
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
即使调用了确认方法,提交偏移也不会更新.保留记录后更新提交偏移量的最佳方法是什么?
我正在使用spring-kafka 2.1.7.RELEASE.
编辑:@GaryRussell之后确认,由外线程进行的确认是在下一次轮询期间由使用者线程执行的,我重新检查了代码,发现了如何设置最后一个确认对象的错误.修复此问题后,提交偏移量已按预期更新.因此,此问题已解决.但是,我无法将这个问题标记为已回答.
这是问题所在,使用者线程负责提交偏移量.在轮询时,消费者线程将提交先前的批次偏移量.
因为在您的情况下, AUTO_COMMIT
为false,并且 lastAcknowledgment.acknowledge()
未确认偏移量未提交.
执行此操作的唯一方法是,一旦获得轮询记录,就将 Schedule
任务设置为 Async
,并保留使用者线程并在Async完成后提交偏移量任务,请检查此答案以供参考 answer >
注意:如果您持有使用方线程超过5分钟,则会重新平衡这里
新的Java Consumer现在支持从后台线程进行心跳.有一个新的配置max.poll.interval.ms,它控制使用者调用前主动离开组(默认为5分钟)之前,两次轮询调用之间的最长时间.配置request.timeout.ms的值必须始终大于
max.poll.interval.ms
,因为这是使用者重新平衡时JoinGroup请求可以在服务器上阻止的最长时间,因此我们将其默认值更改为刚好超过5分钟.最后,session.timeout.ms
的默认值已调低至10秒,而max.poll.records
的默认值已更改为500./p>
特别说明,来自春季卡夫卡> 2.1.5
即将在下一次民意调查之前由消费者线程执行对外线程的确认
感谢@Gary Russell的信息I have a spring-kafka consumer which reads records and hands them over to a cache. A scheduled task will clear the records in the cache periodically. I want to update the COMMIT OFFSET only after a batch has been successfully saved in the database. I tried passing the acknowledgment object to the cache service to invoke the acknowledge method as shown below.
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
The AckMode has been set as follows:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
And the Auto Commit is false:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
Even though the acknowledge method is called, the commit offset is not updated. What is the best way to update the commit offset after persisting the records?
I'm using spring-kafka 2.1.7.RELEASE.
EDIT: After @GaryRussell confirmed that acknowledgements made by foreign threads are performed by the consumer thread during the next poll, I rechecked my code and found a bug in how the last acknowledgement object is set. After fixing this, the commit offset IS UPDATED as expected. So this issue has been resolved. However, I have no way to mark this question as answered.
Here is the problem, Consumer thread is responsible to commit the offset. At the time of poll consumer thread will submit the previous batch offset.
Since in your case AUTO_COMMIT
is false and lastAcknowledgment.acknowledge()
is not acknowledge the offset is not submit.
Only one way to do this, As soon as you get the poll records make Schedule
task as Async
and hold the consumer thread and submit the offset after completion of Async task, Check this answer for reference answer
Note If you hold consumer thread more than 5 minutes rebalance will takes place here
he new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than
max.poll.interval.ms
because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value ofsession.timeout.ms
has been adjusted down to 10 seconds, and the default value ofmax.poll.records
has been changed to 500.
Special Note from spring kafka >2.1.5
Acknowledgments made on foreign threads will be performed by the consumer thread just before the next poll Thanks for @Gary Russell for this information
这篇关于成功批量插入后更新Kafka提交偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!