为什么Kafka消费者需要很长时间才能开始消费? [英] Why does a Kafka consumer take a long time to start consuming?

查看:21
本文介绍了为什么Kafka消费者需要很长时间才能开始消费?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们启动一个 Kafka 消费者,监听一个可能尚未创建的主题(尽管主题自动创建已启用).

We start a Kafka consumer, listening on a topic which may not yet be created (topic auto creation is enabled though).

此后不久,生产者发布了关于该主题的消息.

Not long thereafter a producer is publishing messages on that topic.

但是,消费者需要一些时间来注意这一点:准确地说是 5 分钟.此时消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了群体.查看消费者与 kafka 日志的时间戳,这个过程是在消费者端启动的.

However, it takes some time for the consumer to notice this: 5 minutes to be exact. At this point the consumer revokes its partitions and rejoins the consumer group. Kafka re-stabilizes the group. Looking at the time-stamps of the consumer vs. kafka logs, this process is initiated at the consumer side.

我想这是预期的行为,但我想了解这一点.这实际上是重新平衡(从 0 到 1 分区)吗?如果我们预先创建主题,会不会发生这种情况?

I suppose this is expected behavior but I would like to understand this. Is this actually a re-balancing going on (from 0 to 1 partition)? If we'd create topics upfront, would this not happen?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2017-02-01 08:41:45.540  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:41:45.544  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning

kafka 日志

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator)

推荐答案

这可能是由于参数 metadata.max.age.ms 的默认值控制了消费者强制执行的频率刷新主题的元数据.

This is probably due to the default value of the parameter metadata.max.age.ms which controls how often the consumer forces a refresh of metadata for a topic.

当你用一个不存在的主题启动消费者时会发生什么是代理自动创建这个主题,但这需要一些时间来进行领导者选举等,所以当你的消费者请求该主题的元数据时,它会得到一个LEADER_NOT_AVAILABLE 警告,无法获取任何消息.达到上述超时后,消费者刷新元数据,这次成功并开始读取消息.这不依赖于生产者向主题写入消息,它纯粹是消费者的事情.

What happens when you start the consumer up with a non existing topic is that the brokers autocreate this topic, but this takes a little bit of time with leader election etc., so when your consumer requests metadata for that topic it gets a LEADER_NOT_AVAILABLE warning and can't fetch any messages. After the timeout mentioned above is reached the consumer refreshes metadata, successfully this time around and starts reading messages. This is not dependent on a producer writing messages to the topic, it is purely a consumer thing.

如果您以 1000 毫秒的超时时间启动您的消费者,您应该会看到消息被消费之前的更短延迟.

If you start your consumer with for example 1000ms timeout, you should see a much shorter delay until messages are consumed.

此外,如果您预先创建主题,或者在消费者之前启动生产者,这种行为根本不应该发生.

Also, if you create topics up front, or start the producer before the consumer, this behavior should not happen at all.

这篇关于为什么Kafka消费者需要很长时间才能开始消费?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆