Spring Kafka:轮询新消息而不是使用 `onMessage` 通知 [英] Spring Kafka: Poll for new messages instead of being notified using `onMessage`

查看:26
本文介绍了Spring Kafka:轮询新消息而不是使用 `onMessage` 通知的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的项目中使用 Spring Kafka,因为在基于 Spring 的项目中使用 Kafka 消息似乎是一个自然的选择.为了使用消息,我可以使用 MessageListener 接口.Spring Kafka 在内部负责为每条新消息调用我的 onMessage 方法.

I am using Spring Kafka in my project as it seemed a natural choice in a Spring based project to consume Kafka messages. To consume messages, I can make use of the MessageListener interface. Spring Kafka internally takes care to invoke my onMessage method for each new message.

但是,在我的设置中,我更喜欢显式轮询新消息并按顺序处理它们(这将需要几秒钟).作为一种解决方法,我可能只是阻塞在我的 onMessage 实现中,或者在内部缓冲消息.然而,这似乎违背了 Spring Kafka 的核心思想.

However, in my setting I prefer to explicitly poll for new messages and work on them sequentially (which will take a few seconds). As a workaround, I might just block inside my onMessage implementation, or buffer the messages internally. However, this seems to go against the core idea of Spring Kafka.

Kafka 的设计使消费者必须轮询符合我要求的新消息.有没有办法在 Spring Kafka 中使用这种自然"的工作流程?

Kafka is designed so that consumers have to poll for new messages, which matches my requirements. Is there a way to make use of this "natural" workflow with Spring Kafka?

对于这个用例,我应该避免使用 Spring Kafka 吗?

Should I refrain from using Spring Kafka for this use case?

KafkaConsumer 文档 指出:

The KafkaConsumer documentation states:

对于消息处理时间变化不可预测的用例,这两种选择都不够.推荐的方式处理这些情况是将消息处理移动到另一个线程,这允许消费者在处理器的同时继续调用 poll仍在工作.必须注意确保承诺偏移量不会超过实际位置.通常,您必须禁用自动提交并手动提交已处理的偏移量仅在线程完成处理后才记录(取决于您需要的交付语义).另请注意,您将需要暂停分区,以便不会从轮询中收到新记录直到线程处理完之前返回的那些.

For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

相关问题:https://github.com/spring-projects/spring-kafka/issues/195

推荐答案

必须不断轮询消费者的问题现已解决(在 0.10.1.x 中由 KIP-62) 所以这不是不再有问题(只要您不超过 max.poll.interval.ms),默认为 5 分钟,但可以增加.

The issue with having to keep polling the consumer has now been resolved (in 0.10.1.x by KIP-62) so that's not an issue any more (as long as you don't exceed the max.poll.interval.ms) which is 5 mins by default but can be increased.

然而,如果你想轮询自己,你仍然可以使用 spring-kafka(例如,如果你使用 Boot 来获得 Spring Boot 自动配置的好处),但是你可以从DefaultKafkaConsumerFactorypoll() 它直接.

However, if you want to poll yourself, you can still use spring-kafka (e.g. to get the Spring Boot auto configuration goodness if you are using Boot), but you can get a Consumer from the DefaultKafkaConsumerFactory and poll() it directly.

这篇关于Spring Kafka:轮询新消息而不是使用 `onMessage` 通知的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆