当与 Apache Kafka 服务器的连接丢失时,如何使用 Spring Kafka Listener 停止微服务? [英] How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?

查看:48
本文介绍了当与 Apache Kafka 服务器的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在实现一个微服务,它从 Apache Kafka 主题读取数据.我正在为微服务使用spring-boot, version: 1.5.6.RELEASE",为同一个微服务中的侦听器使用spring-kafka, version: 1.2.2.RELEASE".这是我的 kafka 配置:

 @Bean公共地图<字符串,对象>消费者配置(){return new HashMap() {{把(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,服务器);put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);}};}@豆角,扁豆public ConsumerFactory消费者工厂(){返回新的 DefaultKafkaConsumerFactory<>(consumerConfigs());}@豆角,扁豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());返厂;}

我已经通过 @KafkaListener 注释实现了监听器:

@KafkaListener(topics = "${kafka.dataSampleTopic}")公共无效接收(消费者记录<字符串,字符串>有效载荷){//商业逻辑闩锁.countDown();}

当侦听器与 Apache Kafka 服务器断开连接时,我需要能够关闭微服务.

当我杀死 kafka 服务器时,我在 spring 启动日志中收到以下消息:

2017-11-01 19:58:15.721 INFO 16800 --- [ 0-C-1] Oakccinternals.AbstractCoordinator : 标记协调器 192.168.0.4:9092 (id: 214548264 rack dead)对于组 TestGroup

当我启动 kafka sarver 时,我得到:

2017-11-01 20:01:37.748 INFO 16800 --- [ 0-C-1] Oakccinternals.AbstractCoordinator : 发现协调器 192.168.0.4:9092 (id: 2145482646) 机架组测试组.

很明显,我的微服务中的 Spring Kafka Listener 能够检测 Kafka 服务器何时启动并运行,何时未启动.在 confluent 的书中 Kafka The Definitive GuideBut How Do We Exit?一章中说需要在Consumer上调用wakeup()方法,这样一个WakeupException 将被抛出.因此,我尝试使用 @EventListener 标记捕获两个事件(Kafka 服务器关闭和 Kafka 服务器启动),如 Spring for Apache Kafka 文档,然后调用 wakeup().但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况.有人可以帮我解决这个问题.提前致谢.

解决方案

我不知道如何获得服务器停机情况的通知(根据我的经验,消费者在 poll()).

但是,如果您弄清楚了,您可以停止侦听器容器,这将唤醒消费者并退出紧密循环......

@Autowired私有 KafkaListenerEndpointRegistry 注册表;...this.registry.stop();

<块引用>

2017-11-01 16:29:54.290 INFO 21217 --- [广告 |so47062346] o.a.k.c.c.internals.AbstractCoordinator :标记协调器 localhost:9092 (id: 2147483647 rack: null) 为组 so47062346 死亡

2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点 0 的连接.经纪人可能不可用.

...

2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient:无法建立到节点 0 的连接.经纪人可能不可用.

2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer:消费者停止

您可以通过添加 reconnect.backoff.ms 来改进紧密循环,但是 poll() 永远不会退出,因此我们无法发出空闲事件.>

弹簧:卡夫卡:消费者:启用自动提交:false组 ID:so47062346特性:reconnect.backoff.ms: 1000

我想您可以启用空闲事件并使用计时器来检测您是否在一段时间内没有收到任何数据(或空闲事件),然后停止容器.

I am currently implementing a micro service, which reads data from Apache Kafka topic. I am using "spring-boot, version: 1.5.6.RELEASE" for the micro service and "spring-kafka, version: 1.2.2.RELEASE" for the listener in the same micro service. This is my kafka configuration:

    @Bean
public Map<String, Object> consumerConfigs() {
    return new HashMap<String, Object>() {{
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
    }};
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

I have implemented the listener via the @KafkaListener annotation:

@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
    //business logic
    latch.countDown();
}

I need to be able to shutdown the micro service, when the listener looses connection to the Apache Kafka server.

When I kill the kafka server I get the following message in the spring boot log:

2017-11-01 19:58:15.721  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup

When I start the kafka sarver, I get:

2017-11-01 20:01:37.748  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.

So clearly the Spring Kafka Listener in my micro service is able to detect when the Kafka Server is up and running and when it's not. In the book by confluent Kafka The Definitive Guide in chapter But How Do We Exit? it is said that the wakeup() method needs to be called on the Consumer, so that a WakeupException would be thrown. So I tried to capture the two events (Kafka server down and Kafka server up) with the @EventListener tag, as described in the Spring for Apache Kafka documentation, and then call wakeup(). But the example in the documentation is on how to detect idle consumer, which is not my case. Could someone please help me with this. Thanks in advance.

解决方案

I don't know how to get a notification of the server down condition (in my experience, the consumer goes into a tight loop within the poll()).

However, if you figure that out, you can stop the listener container(s) which will wake up the consumer and exit the tight loop...

@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.stop();

2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346

2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

...

2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped

You can improve the tight loop by adding reconnect.backoff.ms, but the poll() never exits so we can't emit an idle event.

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      group-id: so47062346
    properties:
      reconnect.backoff.ms: 1000

I suppose you could enable idle events and use a timer to detect if you've received no data (or idle events) for some period of time, and then stop the container(s).

这篇关于当与 Apache Kafka 服务器的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆