批量记录处理后如何提交卡夫卡抵销 [英] How to submit kafka offset after processing batch records
本文介绍了批量记录处理后如何提交卡夫卡抵销的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用spring-kafka
并使用Kafka主题中的批量记录,并提交偏移量AbstractMessageListenerContainer.AckMode.BATCH
。
List
记录分配给一个线程(名称:ProcessThread
),该线程将处理所有记录并将结果返回给使用者线程,然后使用者线程将记录结果。(在此过程中,使用者线程将一直等待,直到它从ProcessThread
获得结果,这会导致性能低下。
ProcessThread
有没有办法向Kafka提交Offset?因此使用者线程不需要等待,它将为每个轮询创建一个新processThread
在我的例子中,我的主题有20个分区和10个Pod,每个Pod有2个消费者线程(Spring Kafka并发消费者),每个轮询100条记录(使用Spring Boot线程处理所有这些记录@Async
)
通过上面的配置,我可以在2小时内处理100万条记录,我需要将其拖到至少40分钟。
😊感谢任何帮助
推荐答案
您应该至少升级到1.3.7。当前版本为2.1.10。其中没有单独的线索。只有使用者线程可以发送偏移量。使用者不是线程安全的。
您不应将其移交给单独的线程。使用更高的并发性和更多的分区。
这篇关于批量记录处理后如何提交卡夫卡抵销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文