为什么Kafka消费者需要很长时间才能开始消费? [英] Why does a Kafka consumer take a long time to start consuming?

查看:279
本文介绍了为什么Kafka消费者需要很长时间才能开始消费?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们启动一个Kafka使用者,监听尚未创建的主题(尽管启用了主题自动创建).

We start a Kafka consumer, listening on a topic which may not yet be created (topic auto creation is enabled though).

此后不久,生产者就发布有关该主题的消息.

Not long thereafter a producer is publishing messages on that topic.

但是,消费者要花一些时间注意:准确地说是5分钟.此时,消费者撤消其分区并重新加入消费者组.卡夫卡重新稳定了集团.查看消费者与kafka日志的时间戳,此过程是在消费者端启动的.

However, it takes some time for the consumer to notice this: 5 minutes to be exact. At this point the consumer revokes its partitions and rejoins the consumer group. Kafka re-stabilizes the group. Looking at the time-stamps of the consumer vs. kafka logs, this process is initiated at the consumer side.

我认为这是预期的行为,但我想了解这一点.这实际上是正在进行的重新平衡(从0分区到1分区)吗?如果我们要预先创建主题,这会不会发生?

I suppose this is expected behavior but I would like to understand this. Is this actually a re-balancing going on (from 0 to 1 partition)? If we'd create topics upfront, would this not happen?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2017-02-01 08:41:45.540  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning

kafka日志

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator)

推荐答案

这可能是由于参数 metadata.max.age.ms 的默认值所控制的,该参数控制消费者强制执行一次刷新主题的元数据.

This is probably due to the default value of the parameter metadata.max.age.ms which controls how often the consumer forces a refresh of metadata for a topic.

当您使用不存在的主题启动消费者时,经纪人会自动创建此主题,但这需要花费一些时间进行领导者选举等,因此当您的消费者请求该主题的元数据时,它将获得一个LEADER_NOT_AVAILABLE警告,无法提取任何消息. 达到上述超时后,消费者将刷新元数据,这次成功并开始读取消息.这不依赖于生产者向该主题编写消息,而纯粹是消费者的事情.

What happens when you start the consumer up with a non existing topic is that the brokers autocreate this topic, but this takes a little bit of time with leader election etc., so when your consumer requests metadata for that topic it gets a LEADER_NOT_AVAILABLE warning and can't fetch any messages. After the timeout mentioned above is reached the consumer refreshes metadata, successfully this time around and starts reading messages. This is not dependent on a producer writing messages to the topic, it is purely a consumer thing.

例如,如果您以1000毫秒的超时启动使用者,那么在消息被消耗之前,您应该看到的延迟要短得多.

If you start your consumer with for example 1000ms timeout, you should see a much shorter delay until messages are consumed.

此外,如果您先创建主题,或者在使用者之前创建生产者,则此行为根本不会发生.

Also, if you create topics up front, or start the producer before the consumer, this behavior should not happen at all.

这篇关于为什么Kafka消费者需要很长时间才能开始消费?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆