Spring Kafka:轮询新消息,而不是使用onMessage进行通知 [英] Spring Kafka: Poll for new messages instead of being notified using `onMessage`

查看:683
本文介绍了Spring Kafka:轮询新消息,而不是使用onMessage进行通知的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的项目中使用Spring Kafka,因为在基于Spring的项目中使用Kafka消息似乎是很自然的选择.要使用消息,我可以使用MessageListener接口. Spring Kafka在内部负责为每条新消息调用我的onMessage方法.

I am using Spring Kafka in my project as it seemed a natural choice in a Spring based project to consume Kafka messages. To consume messages, I can make use of the MessageListener interface. Spring Kafka internally takes care to invoke my onMessage method for each new message.

但是,在我的设置中,我希望显式轮询新消息并按顺序进行处理(这将需要几秒钟).作为一种解决方法,我可能只是阻塞我的onMessage实现内部,或在内部缓冲消息.但是,这似乎违背了Spring Kafka的核心思想.

However, in my setting I prefer to explicitly poll for new messages and work on them sequentially (which will take a few seconds). As a workaround, I might just block inside my onMessage implementation, or buffer the messages internally. However, this seems to go against the core idea of Spring Kafka.

Kafka的设计使消费者不得不轮询新邮件,这符合我的要求.有没有办法在Spring Kafka中利用这种自然"的工作流程?

Kafka is designed so that consumers have to poll for new messages, which matches my requirements. Is there a way to make use of this "natural" workflow with Spring Kafka?

在这种使用情况下,我应该避免使用Spring Kafka吗?

Should I refrain from using Spring Kafka for this use case?

文档状态:

对于消息处理时间意外变化的用例, 这些选项都不足够.推荐的方式 处理这些情况是将消息处理移至另一个线程, 允许消费者在处理器处理过程中继续调用民意调查 仍在工作.必须采取一定的措施以确保承诺 偏移量不超过实际位置.通常,您必须 禁用自动提交并手动提交已处理的偏移量 仅在线程完成处理后记录(取决于 您需要的投放语义).另请注意,您将需要 暂停分区,以便从轮询中没有收到新记录 直到线程处理完之前返回的内容为止.

For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

相关问题: https://github.com/spring-projects/spring-kafka/issues/195

推荐答案

必须继续轮询消费者的问题现已解决(在0.10.1.x中,由

The issue with having to keep polling the consumer has now been resolved (in 0.10.1.x by KIP-62) so that's not an issue any more (as long as you don't exceed the max.poll.interval.ms) which is 5 mins by default but can be increased.

但是,如果要轮询自己,则仍然可以使用spring-kafka(例如,如果使用Boot,则可以获取Spring Boot自动配置的优点),但是可以从DefaultKafkaConsumerFactoryConsumer中获得Consumer. poll()直接.

However, if you want to poll yourself, you can still use spring-kafka (e.g. to get the Spring Boot auto configuration goodness if you are using Boot), but you can get a Consumer from the DefaultKafkaConsumerFactory and poll() it directly.

这篇关于Spring Kafka:轮询新消息,而不是使用onMessage进行通知的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆