Spring for Kafka 2.3 在运行时使用 KafkaMessageListenerContainer 为特定侦听器设置偏移量 [英] Spring for Kafka 2.3 setting an offset during runtime for specific listener with KafkaMessageListenerContainer

查看:21
本文介绍了Spring for Kafka 2.3 在运行时使用 KafkaMessageListenerContainer 为特定侦听器设置偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须实现一个功能来(重新)将某个主题/分区的侦听器设置为任何给定的偏移量.因此,如果事件提交到偏移量 5 并且管理员决定将偏移量重置为 2,则应重新处理事件 3、4 和 5.

I have to implement a functionality to (re-)set a listener for a certain topic/partition to any given offset. So if events are commited to the offset 5 and the admin decides to reset the offset to 2 then the event 3, 4 and 5 should be reprocessed.

我们正在使用 Spring for Kafka 2.3,我试图遵循 ConsumerSeekAware 这似乎正是我正在寻找的.

We are using Spring for Kafka 2.3 and I was trying to follow the documentation on ConsumerSeekAware which seems to be exactly what I am looking for.

然而,问题是我们也在使用在运行时创建的主题.为此,我们通过 DefaultKafkaConsumerFactory 使用 KafkaMessageListenerContainer,但我不知道把 registerSeekCallback 或类似的东西放在哪里.

The problem however is that we are using topics that are created on runtime as well. We use a KafkaMessageListenerContainer through a DefaultKafkaConsumerFactory for that purpose and I don't know where to put the registerSeekCallback or something alike.

有什么办法可以做到这一点吗?我在理解使用 @KafkaListener 注释的类如何映射到工厂中创建侦听器的方式时遇到问题.

Is there any way to achieve this? I have problems understanding how the class using the @KafkaListener annotations maps to the way how listeners are created in the factory.

任何帮助将不胜感激.即使这只是对这些东西如何协同工作的解释.

Any help would be appreciated. Even if it is only an explanation on how these things work together.

这是 KafkaMessageListenerContainer 的基本创建方式:

This is how the KafkaMessageListenerContainer are basically created:

public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
        ContainerPropertiesStrategy containerPropertiesStrategy) {
    MessageListener<String, String> messageListener = getMessageListener(topicName);

    ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());

    KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
    return kafkaMessageListenerContainer;
}

public MessageListener<String, String> getMessageListener(String topic) {
    MessageListener<String, String> messageListener = new MessageListener<String, String>() {

        @Override
        public void onMessage(ConsumerRecord<String, String> message) {
            try {
                consumerService.consume(topic, message.value());
            } catch (IOException e) {
                log.log(Level.WARNING, "Message couldn't be consumed", e);
            }
        }
    };
    return messageListener;
}

public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
  String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
  ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);

KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
    consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}

希望有所帮助.

推荐答案

关键组件是 AbstractConsumerSeekAware.希望这足以让您入门...

The key component is the AbstractConsumerSeekAware. Hopefully this will provide enough to get you started...

@SpringBootApplication
public class So59682801Application {

    public static void main(String[] args) {
        SpringApplication.run(So59682801Application.class, args).close();
    }


    @Bean
    public ApplicationRunner runner(ListenerCreator creator,
            KafkaTemplate<String, String> template, GenericApplicationContext context) {

        return args -> {
            System.out.println("Hit enter to create a listener");
            System.in.read();

            ConcurrentMessageListenerContainer<String, String> container =
                    creator.createContainer("so59682801group", "so59682801");

            // register the container as a bean so that all the "...Aware" interfaces are satisfied
            context.registerBean("so59682801", ConcurrentMessageListenerContainer.class, () -> container);
            context.getBean("so59682801", ConcurrentMessageListenerContainer.class); // re-fetch to initialize

            container.start();

            // send some messages
            IntStream.range(0, 10).forEach(i -> template.send("so59682801", "test" + i));

            System.out.println("Hit enter to reseek");
            System.in.read();
            ((MyListener) container.getContainerProperties().getMessageListener())
                .reseek(new TopicPartition("so59682801", 0), 5L);

            System.out.println("Hit enter to exit");
            System.in.read();
        };
    }

}

@Component
class ListenerCreator {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    ListenerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        factory.getContainerProperties().setIdleEventInterval(5000L);
        this.factory = factory;
    }

    ConcurrentMessageListenerContainer<String, String> createContainer(String groupId, String... topics) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topics);
        container.getContainerProperties().setGroupId(groupId);
        container.getContainerProperties().setMessageListener(new MyListener());
        return container;
    }

}

class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        System.out.println(data);
    }

    public void reseek(TopicPartition partition, long offset) {
        getSeekCallbackFor(partition).seek(partition.topic(), partition.partition(), offset);
    }

}

在消费者线程从 poll() 唤醒时(实际上是在下一个之前),在侦听器上调用 reseek() 会排队寻找消费者线程.

Calling reseek() on the listener queues the seek for the consumer thread when it wakes from the poll() (actually before the next one).

这篇关于Spring for Kafka 2.3 在运行时使用 KafkaMessageListenerContainer 为特定侦听器设置偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆