动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息 [英] Dynamically start and off KafkaListener just to load previous messages at the start of a session
问题描述
我有一个用于 kafkalistener 的工作代码,用于从主题的开头(offset=0)
(始终运行)读取消息.对于我的用例(消息传递),我需要两件事:
I have a working code for kafkalistener to read messages from the beginning(offset=0)
of a topic (always running).
For my use case (messaging) I need 2 things:
总是捕捉特定主题/分区的新消息(这个消费者总是在运行)并发送到前端 websocket+stomp.(这部分我已经有了)
always catch new messages(this consumer is always running) of specific topic/partition and send to frontend websocket+stomp. (I already have this part)
启动新的消费者以获取从特定主题/分区的开始到当前的消息,只有在前端发出信号时才停止,以便可以获取这些数据(为迟到的用户或稍后加载以前的消息)通过前端 websocket+stomp(在会话开始时)
start new consumer to get messages from beginning to current of specific topic/partition, only when frontend signals and then stop after that so that these data(loading previous messages for the late user or for later) can be fetched by frontend websocket+stomp (at the beginning of its session)
如果我可以动态地(在从前端获取信号后)添加/删除带有参数的 kafkaListener(来自 post 方法的数据),它将同时提供两个服务
实际上,我该如何实现?我应该考虑使用 post 方法通知后端我现在需要加载此主题/分区的先前消息并将其发送到此.."网址吗?但是,如何在不一直运行的情况下动态启动和关闭该消费者(kafkaListener)并在那里传递参数?
actually, how can I implement this? should I think of using post method to notify backend that I need to load previous messages of this topic/partition right now and send those it to this ".." url? but then how can I dynamically start and off that consumer(kafkaListener) without running all the time and pass parameter there?
推荐答案
这是一个快速的 Spring Boot 应用程序,展示了如何动态创建容器.
Here is a quick Spring Boot application showing how to dynamically create containers.
@SpringBootApplication
public class So61950229Application {
public static void main(String[] args) {
SpringApplication.run(So61950229Application.class, args);
}
@Bean
public ApplicationRunner runner(DynamicListener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so61950229", "foo" + i));
System.out.println("Hit enter to start a listener");
System.in.read();
listener.newContainer("so61950229", 0);
System.out.println("Hit enter to start another listener");
System.in.read();
listener.newContainer("so61950229", 0);
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61950229").partitions(1).replicas(1).build();
}
}
@Component
class DynamicListener {
private static final Logger LOG = LoggerFactory.getLogger(DynamicListener.class);
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers
= new ConcurrentHashMap<>();
DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
void newContainer(String topic, int partition) {
ConcurrentMessageListenerContainer<String, String> container =
this.factory.createContainer(new TopicPartitionOffset(topic, partition));
String groupId = UUID.randomUUID().toString();
container.getContainerProperties().setGroupId(groupId);
container.setupMessageListener((MessageListener) record -> {
System.out.println(record);
});
this.containers.put(groupId, container);
container.start();
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
AbstractMessageListenerContainer<String, String> container = this.containers.remove(
event.getContainer(ConcurrentMessageListenerContainer.class).getContainerProperties().getGroupId());
if (container != null) {
LOG.info("Stopping idle container");
container.stop(() -> LOG.info("Stopped"));
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000
这篇关于动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!