批量记录处理后如何提交卡夫卡抵销 [英] How to submit kafka offset after processing batch records

查看:47
本文介绍了批量记录处理后如何提交卡夫卡抵销的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spring-kafka并使用Kafka主题中的批量记录,并提交偏移量AbstractMessageListenerContainer.AckMode.BATCH

在我的例子中,处理批处理记录需要时间(大约20秒),而使用者线程等待批处理过程完成,然后再次执行轮询(在这次轮询时提交偏移量)。在本例中,我将List记录分配给一个线程(名称:ProcessThread),该线程将处理所有记录并将结果返回给使用者线程,然后使用者线程将记录结果。(在此过程中,使用者线程将一直等待,直到它从ProcessThread获得结果,这会导致性能低下。

ProcessThread有没有办法向Kafka提交Offset?因此使用者线程不需要等待,它将为每个轮询创建一个新processThread

在我的例子中,我的主题有20个分区和10个Pod,每个Pod有2个消费者线程(Spring Kafka并发消费者),每个轮询100条记录(使用Spring Boot线程处理所有这些记录@Async)

通过上面的配置,我可以在2小时内处理100万条记录,我需要将其拖到至少40分钟。

😊感谢任何帮助

推荐答案

您应该至少升级到1.3.7。当前版本为2.1.10。其中没有单独的线索。只有使用者线程可以发送偏移量。使用者不是线程安全的。

您不应将其移交给单独的线程。使用更高的并发性和更多的分区。

这篇关于批量记录处理后如何提交卡夫卡抵销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆