在 Kafka Broker 宕机的情况下处理失败 [英] Handle failure in case the Kafka Broker is down

查看:61
本文介绍了在 Kafka Broker 宕机的情况下处理失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况.

I have a Kafka broker running where messages are consumed successfully but I want to handle the case where Kafka Broker is down on the Kafka Consumer end.

我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况?

I have read this thread but came to know that the logs are shown at DEBUG level. I was wondering if I can handle this manually on an event trigger may be because I want to handle the failure of Kafka broker myself. Does Spring Kafka provide something to handle this situation?

如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这些建议可以为我指明正确的方向.谢谢

Please tell me if any more details are needed. I would highly appreciate any suggestions on this that would point me in the right direction. Thanks

编辑 1:

正如@Artem 所回答的,我已经在我的 KafkaConsumer 中尝试过这个

As answered by @Artem, I've tried this in my KafkaConsumer

@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
    LOGGER.info("*****************************************");
    LOGGER.info("Hello NonResponsiveConsumer {}", event);
    LOGGER.info("*****************************************");     
}

即使 Kafka 服务器正在运行(当我第一次启动应用程序时),此事件也会被触发一次.请查看以下日志:

This event is triggered ONCE even when Kafka server is running (when I start the application first time). Please see the below logs:

....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [52.214.67.60:9091]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = workerListener
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....

编辑 2:

问题通过将 spring-kafka 升级到 1.3.2

Issue resolved by upgrading spring-kafka to 1.3.2

推荐答案

1.3.1 版本开始有:

/**
 * An event that is emitted when a consumer is not responding to
 * the poll; a possible indication that the broker is down.
 *
 * @author Gary Russell
 * @since 1.3.1
 *
 */
@SuppressWarnings("serial")
public class NonResponsiveConsumerEvent extends KafkaEvent {

并引用文档:

另外,如果broker不可达(在写的时候),consumer的poll()方法不会退出,所以收不到消息,也不能产生idle事件.为解决此问题,如果投票未在 pollInterval 属性的 3 倍内返回,则容器将发布 NonResponsiveConsumerEvent.默认情况下,此检查在每个容器中每 30 秒执行一次.在配置侦听器容器时,您可以通过在 ContainerProperties 中设置 monitorIntervalnoPollThreshold 属性来修改行为.收到这样的事件将允许您停止容器,从而唤醒消费者以便它可以终止.

In addition, if the broker is unreachable (at the time of writing), the consumer poll() method does not exit, so no messages are received, and idle events can’t be generated. To solve this issue, the container will publish a NonResponsiveConsumerEvent if a poll does not return within 3x the pollInterval property. By default, this check is performed once every 30 seconds in each container. You can modify the behavior by setting the monitorInterval and noPollThreshold properties in the ContainerProperties when configuring the listener container. Receiving such an event will allow you to stop the container(s), thus waking the consumer so it can terminate.

这篇关于在 Kafka Broker 宕机的情况下处理失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆