Spring Kafka-如何使用组ID将偏移量重置为最新值? [英] Spring Kafka - How to reset offset to latest with a group id?

查看:349
本文介绍了Spring Kafka-如何使用组ID将偏移量重置为最新值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用Spring Integration Kafka进行实时统计.不过,该组名称使Kafka可以搜索监听器未读取的所有先前值.

I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

我想开始使用最新的偏移量,而不会被旧的值所困扰.是否可以重置组的偏移量?

I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?

推荐答案

由于我没有看到任何示例,因此我将在此解释我的工作方式.

Because I didn't saw any example of this, I'm gonna explain how I did here.

您的 @KafkaListener 的类必须实现一个 ConsumerSeekAware 类,该类将允许侦听器控制分配分区时的偏移量查找.(来源: https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

The class of your @KafkaListener must implement a ConsumerSeekAware class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

在这里,重新平衡时,我们使用给定的回调来查找所有给定主题的最后一个偏移量.感谢Artem Bilan( https://stackoverflow.com/users/2756547/artem-bilan )指导了我答案.

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.

这篇关于Spring Kafka-如何使用组ID将偏移量重置为最新值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆