在 Kafka Broker 宕机的情况下处理失败 [英] Handle failure in case the Kafka Broker is down
问题描述
我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况.
I have a Kafka broker running where messages are consumed successfully but I want to handle the case where Kafka Broker is down on the Kafka Consumer end.
我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况?
I have read this thread but came to know that the logs are shown at DEBUG level. I was wondering if I can handle this manually on an event trigger may be because I want to handle the failure of Kafka broker myself. Does Spring Kafka provide something to handle this situation?
如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这些建议可以为我指明正确的方向.谢谢
Please tell me if any more details are needed. I would highly appreciate any suggestions on this that would point me in the right direction. Thanks
编辑 1:
正如@Artem 所回答的,我已经在我的 KafkaConsumer 中尝试过这个
As answered by @Artem, I've tried this in my KafkaConsumer
@EventListener
public void handleEvent(NonResponsiveConsumerEvent event) {
LOGGER.info("*****************************************");
LOGGER.info("Hello NonResponsiveConsumer {}", event);
LOGGER.info("*****************************************");
}
即使 Kafka 服务器正在运行(当我第一次启动应用程序时),此事件也会被触发一次.请查看以下日志:
This event is triggered ONCE even when Kafka server is running (when I start the application first time). Please see the below logs:
....
....
2017-12-04 13:08:02,177 INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 0
2017-12-04 13:08:02,218 INFO o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [52.214.67.60:9091]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = workerListener
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.0
2017-12-04 13:08:02,346 INFO o.a.k.c.u.AppInfoParser - Kafka commitId : cb8625948210849f
2017-12-04 13:08:02,350 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
2017-12-04 13:08:02,363 INFO o.s.b.a.e.j.EndpointMBeanExporter - Located managed bean 'auditEventsEndpoint': registering with JMX server as MBean [org.springframework.boot:type=Endpoint,name=auditEventsEndpoint]
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
2017-12-04 13:08:02,397 INFO c.t.m.w.c.k.c.KafkaConsumer - Hello NonResponsiveConsumer ListenerContainerIdleEvent [timeSinceLastPoll=1.51237491E9s, listenerId=workerListener-0, container=KafkaMessageListenerContainer [id=workerListener-0, clientIndex=-0, topicPartitions=null], topicPartitions=null]
2017-12-04 13:08:02,403 INFO c.t.m.w.c.k.c.KafkaConsumer - *****************************************
....
....
编辑 2:
问题通过将 spring-kafka
升级到 1.3.2
Issue resolved by upgrading spring-kafka
to 1.3.2
推荐答案
从 1.3.1
版本开始有:
/**
* An event that is emitted when a consumer is not responding to
* the poll; a possible indication that the broker is down.
*
* @author Gary Russell
* @since 1.3.1
*
*/
@SuppressWarnings("serial")
public class NonResponsiveConsumerEvent extends KafkaEvent {
并引用文档:
另外,如果broker不可达(在写的时候),consumer的poll()
方法不会退出,所以收不到消息,也不能产生idle事件.为解决此问题,如果投票未在 pollInterval
属性的 3 倍内返回,则容器将发布 NonResponsiveConsumerEvent
.默认情况下,此检查在每个容器中每 30 秒执行一次.在配置侦听器容器时,您可以通过在 ContainerProperties
中设置 monitorInterval
和 noPollThreshold
属性来修改行为.收到这样的事件将允许您停止容器,从而唤醒消费者以便它可以终止.
In addition, if the broker is unreachable (at the time of writing), the consumer
poll()
method does not exit, so no messages are received, and idle events can’t be generated. To solve this issue, the container will publish aNonResponsiveConsumerEvent
if a poll does not return within 3x thepollInterval
property. By default, this check is performed once every 30 seconds in each container. You can modify the behavior by setting themonitorInterval
andnoPollThreshold
properties in theContainerProperties
when configuring the listener container. Receiving such an event will allow you to stop the container(s), thus waking the consumer so it can terminate.
这篇关于在 Kafka Broker 宕机的情况下处理失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!