再次使用来自 kafka 日志压缩主题的消息 [英] Consuming again messages from kafka log compaction topic
问题描述
我有一个带有 Kafka 消费者的 Spring 应用程序,使用 @KafkaListerner 注释.正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的场景.以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置.
I have a spring application with a Kafka consumer using a @KafkaListerner annotation. The topic being consumed is log compacted and we might have the scenario where we must consume again the topic messages. What's the best way to achieve this programmatically? We don't control the Kafka topic configuration.
推荐答案
@KafkaListener(...)
public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
if (this.resetNeeded) {
consumer.seekToBeginning(consumer.assignment());
this.resetNeeded = false;
}
}
如果您想在侦听器空闲(无记录)时重置,您可以启用空闲事件并通过侦听 ApplicationListener
或 ListenerContainerIdleEvent
中的 ListenerContainerIdleEvent
来执行搜索代码>@EventListener 方法.
If you want to reset when the listener is idle (no records) you can enable idle events and perform the seeks by listening for a ListenerContainerIdleEvent
in an ApplicationListener
or @EventListener
method.
该事件引用了消费者.
编辑
@SpringBootApplication
public class So58769796Application {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("One:" + key + ":" + value);
}
@KafkaListener(id = "so58769796a", topics = "so58769796")
public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Two:" + key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
boolean reset;
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
this.reset = true;
};
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
if (this.reset && event.getListenerId().startsWith("so58769796-")) {
event.getConsumer().seekToBeginning(event.getConsumer().assignment());
}
}
}
和
spring.kafka.listener.idle-event-interval=5000
EDIT2
这是另一种技术 - 在这种情况下,我们在每次应用启动时(和按需)倒带...
Here's another technique - in this case we rewind each time the app starts (and on demand)...
@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
registry.getListenerContainer("so58769796").stop();
registry.getListenerContainer("so58769796").start();
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
这篇关于再次使用来自 kafka 日志压缩主题的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!