Spring Kafka:关闭容器并使用 ConcurrentKafkaListenerContainerFactory 从特定偏移量读取消息 [英] Spring Kafka: Close the container and read the messages from specific offset with ConcurrentKafkaListenerContainerFactory

查看:86
本文介绍了Spring Kafka:关闭容器并使用 ConcurrentKafkaListenerContainerFactory 从特定偏移量读取消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的 spring kafka 应用程序中,我想根据某个调度程序的输入在运行时触发消费者.调度程序将告诉侦听器它可以从哪个主题开始消费消息.有带有自定义 ConcurrentKafkaListenerContainerFactory 类的 springboot 应用程序.我需要执行三项任务:

In my spring kafka application, I want to trigger the consumer at run time according to input of some scheduler. Scheduler will tell the listener from which topic it can start consuming messages. There is springboot application with custom ConcurrentKafkaListenerContainerFactory class. I need to perform three tasks:

  1. 关闭容器,在成功读取主题上的所有可用消息后.
  2. 它将在数据库或文件系统中存储当前偏移量.
  3. 下次消费者再次启动时,可以使用存储的偏移量来处理记录,而不是由 Kafka 管理的默认偏移量.以便将来我们可以更改 DB 中的偏移值并获得所需的报告.我知道如何使用 @KafkaListener 处理所有这些,但不确定如何与 ConcurrentKafkaListenerContainerFactory 挂钩.当前代码如下:
  1. close the container, After successfully reading all the messages available on topic.
  2. It will store the current offset in DB or file system.
  3. Next time when consumer up again, the stored offset can be used to process the records instead of default offset managed by Kafka. So that in future we can change the offset value in DB and get get desired reports. I know how to handle all these with @KafkaListener but not sure how to hook with ConcurrentKafkaListenerContainerFactory. The current code is listed below:

@SpringBootApplication
public class KafkaApp{


    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("testTopic").partitions(1).replicas(1).build();
    }

     }

     @Component
     class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "myId", topics = "testTopic")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

}

编辑

@SpringBootApplication
public class KafkaApp{


    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("testTopic").partitions(1).replicas(1).build();
    }

     }

     @Component
     class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "myId", topics = "testTopic")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.getContainerProperties().setIdleEventInterval(3000L); 
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }
     @EventListener
    public void eventHandler(ListenerContainerIdleEvent event) {
        logger.info("No messages received for " + event.getIdleTime() + " milliseconds");
    }


}

推荐答案

当没有消息要处理时,您可以接收ListenerContainerIdleEvent;您可以使用此事件来停止容器;您应该在不同的线程(而不是发布事件的线程)上执行 stop().

You can receive ListenerContainerIdleEvents when there are no messages left to process; you can use this event to stop the container; you should perform the stop() on a different thread (not the one that publishes the event).

参见如何检查是否使用 Spring Kafka Kafka 是空的?

您可以通过多种方式获取分区/偏移量.

You can get the partition/offset in several ways.

void otherListen<List<ConsumerRecord<..., ...>>) 

void otherListen(List<String> others,
    @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets)

您可以在

new TopicPartitionOffset(topicName, 0), startOffset);

创建容器时.

编辑

要在空闲时停止容器,请设置 idleEventInterval 并添加一个 @EventListener 方法并停止容器.

To stop the container when it is idle, set the idleEventInterval and add an @EventListener method and stop the container.

TaskExecutor exec = new SimpleAsyncTaskExecutor();

@EventListener
void idle(ListenerContainerIdleEvent event) {
    log...
    this.exec.execute(() -> event.getContainer(ConcurrentMessageListenerContainer.class).stop());
}

如果您将 concurrency 添加到您的容器中,您需要在停止父容器之前让每个子容器处于空闲状态.

If you add concurrency to your containers, you would need for each child container to go idle before stopping the parent container.

EDIT2

我刚刚将它添加到我为回答您的另一个问题而编写的代码中,它完全按预期工作.

I just added it to the code I wrote for the answer to your other question and it works exactly as expected.

    @KafkaListener(id = "so69134055", topics = "so69134055")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.getContainerProperties().setIdleEventInterval(3000L);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

    TaskExecutor exec = new SimpleAsyncTaskExecutor();

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        log.info(event.toString());
        this.exec.execute(() -> {
            ConcurrentMessageListenerContainer container = event.getContainer(ConcurrentMessageListenerContainer.class);
            log.info("stopping container: " + container.getBeanName());
            container.stop();
        });
    }

[foo.container-0-C-1] Others: [test0, test2, test4, test6, test8]
[foo.container-0-C-1] ListenerContainerIdleEvent [idleTime=5.007s, listenerId=foo.container-0, container=KafkaMessageListenerContainer [id=foo.container-0, clientIndex=-0, topicPartitions=[foo-0]], paused=false, topicPartitions=[foo-0]]
[SimpleAsyncTaskExecutor-1] stopping container: foo.container
[foo.container-0-C-1] [Consumer clientId=consumer-group.for.foo-2, groupId=group.for.foo] Unsubscribed all topics or patterns and assigned partitions
[foo.container-0-C-1] Metrics scheduler closed
[foo.container-0-C-1] Closing reporter org.apache.kafka.common.metrics.JmxReporter
[foo.container-0-C-1] Metrics reporters closed
[foo.container-0-C-1] App info kafka.consumer for consumer-group.for.foo-2 unregistered
[foo.container-0-C-1] group.for.foo: Consumer stopped

这篇关于Spring Kafka:关闭容器并使用 ConcurrentKafkaListenerContainerFactory 从特定偏移量读取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆