Spring Kafka-如何使用组ID将偏移量重置为最新值? [英] Spring Kafka - How to reset offset to latest with a group id?
问题描述
我目前正在使用Spring Integration Kafka进行实时统计.不过,该组名称使Kafka可以搜索监听器未读取的所有先前值.
I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
我想开始使用最新的偏移量,而不会被旧的值所困扰.是否可以重置组的偏移量?
I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?
推荐答案
由于我没有看到任何示例,因此我将在此解释我的工作方式.
Because I didn't saw any example of this, I'm gonna explain how I did here.
您的 @KafkaListener
的类必须实现一个 ConsumerSeekAware
类,该类将允许侦听器控制分配分区时的偏移量查找.(来源: https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )
The class of your @KafkaListener
must implement a ConsumerSeekAware
class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )
public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
在这里,重新平衡时,我们使用给定的回调来查找所有给定主题的最后一个偏移量.感谢Artem Bilan( https://stackoverflow.com/users/2756547/artem-bilan )指导了我答案.
Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.
这篇关于Spring Kafka-如何使用组ID将偏移量重置为最新值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!