当与Apache Kafka Server的连接断开时,如何使用Spring Kafka Listener停止微服务? [英] How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?

查看:270
本文介绍了当与Apache Kafka Server的连接断开时,如何使用Spring Kafka Listener停止微服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前正在实现一个微服务,该服务从Apache Kafka主题读取数据.我在微服务中使用的是"spring-boot,版本:1.5.6.RELEASE",在同一微服务中的侦听器是"spring-kafka,版本:1.2.2.RELEASE".这是我的kafka配置:

I am currently implementing a micro service, which reads data from Apache Kafka topic. I am using "spring-boot, version: 1.5.6.RELEASE" for the micro service and "spring-kafka, version: 1.2.2.RELEASE" for the listener in the same micro service. This is my kafka configuration:

    @Bean
public Map<String, Object> consumerConfigs() {
    return new HashMap<String, Object>() {{
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
    }};
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

我已经通过 @KafkaListener 注释实现了侦听器:

I have implemented the listener via the @KafkaListener annotation:

@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
    //business logic
    latch.countDown();
}

当侦听器断开与Apache Kafka服务器的连接时,我需要能够关闭微服务.

I need to be able to shutdown the micro service, when the listener looses connection to the Apache Kafka server.

当我杀死kafka服务器时,在春季启动日志中会收到以下消息:

When I kill the kafka server I get the following message in the spring boot log:

2017-11-01 19:58:15.721  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup

启动卡夫卡剃须刀时,我得到:

When I start the kafka sarver, I get:

2017-11-01 20:01:37.748  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.

因此,很明显,我的微服务中的Spring Kafka Listener能够检测Kafka Server何时启动并运行,何时不启动.在Confluent的书中 Kafka权威指南但是我们如何退出?一章中,据说需要在Consumer上调用 wakeup()方法,以便使 WakeupException 将被抛出.因此,我尝试使用 @EventListener 标记捕获两个事件(Kafka服务器关闭和Kafka服务器启动),如

So clearly the Spring Kafka Listener in my micro service is able to detect when the Kafka Server is up and running and when it's not. In the book by confluent Kafka The Definitive Guide in chapter But How Do We Exit? it is said that the wakeup() method needs to be called on the Consumer, so that a WakeupException would be thrown. So I tried to capture the two events (Kafka server down and Kafka server up) with the @EventListener tag, as described in the Spring for Apache Kafka documentation, and then call wakeup(). But the example in the documentation is on how to detect idle consumer, which is not my case. Could someone please help me with this. Thanks in advance.

推荐答案

我不知道如何获取服务器停机状态的通知(以我的经验,消费者在 poll内陷入了一个紧密的循环()).

I don't know how to get a notification of the server down condition (in my experience, the consumer goes into a tight loop within the poll()).

但是,如果您知道了这一点,则可以停止侦听器容器,这将唤醒使用者并退出紧密循环...

However, if you figure that out, you can stop the listener container(s) which will wake up the consumer and exit the tight loop...

@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.stop();

2017-11-01 16:29:54.290信息21217 --- [广告|so47062346] o.a.k.c.c.internals.AbstractCoordinator:将so47062346组的协调器localhost:9092(id:2147483647 rack:null)标记为无效

2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346

2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点0的连接.经纪人可能不可用.

2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

...

2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点0的连接.经纪人可能不可用.

2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

2017-11-01 16:30:00.680信息21217 --- [ntainer#0-0-C-1] essageListenerContainer $ ListenerConsumer:消费者已停止

2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped

您可以通过添加 reconnect.backoff.ms 来改善紧密循环,但是 poll()不会退出,因此我们无法发出空闲事件.

You can improve the tight loop by adding reconnect.backoff.ms, but the poll() never exits so we can't emit an idle event.

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      group-id: so47062346
    properties:
      reconnect.backoff.ms: 1000

我想您可以启用空闲事件并使用计时器来检测一段时间内是否未收到任何数据(或空闲事件),然后停止容器.

I suppose you could enable idle events and use a timer to detect if you've received no data (or idle events) for some period of time, and then stop the container(s).

这篇关于当与Apache Kafka Server的连接断开时,如何使用Spring Kafka Listener停止微服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆