再次使用来自Kafka日志压缩主题的消息 [英] Consuming again messages from kafka log compaction topic
问题描述
我有一个使用@KafkaListerner注释的Kafka使用者的spring应用程序.正在消耗的主题经过日志压缩,并且可能出现必须再次使用主题消息的情况.以编程方式实现此目标的最佳方法是什么?我们无法控制Kafka主题的配置.
I have a spring application with a Kafka consumer using a @KafkaListerner annotation. The topic being consumed is log compacted and we might have the scenario where we must consume again the topic messages. What's the best way to achieve this programmatically? We don't control the Kafka topic configuration.
推荐答案
@KafkaListener(...)
public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
if (this.resetNeeded) {
consumer.seekToBeginning(consumer.assignment());
this.resetNeeded = false;
}
}
如果要在侦听器空闲(无记录)时重置,可以启用空闲事件并通过在 ApplicationListener
或 @EventListener
方法.
If you want to reset when the listener is idle (no records) you can enable idle events and perform the seeks by listening for a ListenerContainerIdleEvent
in an ApplicationListener
or @EventListener
method.
该事件引用了消费者.
编辑
@SpringBootApplication
public class So58769796Application {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("One:" + key + ":" + value);
}
@KafkaListener(id = "so58769796a", topics = "so58769796")
public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Two:" + key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
boolean reset;
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
this.reset = true;
};
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
if (this.reset && event.getListenerId().startsWith("so58769796-")) {
event.getConsumer().seekToBeginning(event.getConsumer().assignment());
}
}
}
和
spring.kafka.listener.idle-event-interval=5000
EDIT2
这是另一种技术-在这种情况下,我们会在应用每次启动(并按需)时倒带...
Here's another technique - in this case we rewind each time the app starts (and on demand)...
@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
registry.getListenerContainer("so58769796").stop();
registry.getListenerContainer("so58769796").start();
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
这篇关于再次使用来自Kafka日志压缩主题的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!