为什么kafka消费者会消耗数百条相同的消息? [英] Why is the kafka consumer consuming the same message hundreds of times?

查看:68
本文介绍了为什么kafka消费者会消耗数百条相同的消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从日志中看到665次消耗了完全相同的消息.为什么会这样?

I see from the logs that exact same message is consumed by the 665 times. Why does this happen?

我也在日志中看到了

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies 
that the poll loop is spending too much time message processing. You can address this either by increasing the session 
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

消费者属性

group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20

PS:是否可以仅使用特定数量的消息,例如队列中1000条消息中的10条或50条或100条消息?我当时正在查看"fetch.max.bytes"配置,但似乎是针对邮件大小而不是邮件数量.

PS: Is it possible to consume only a specific number of messages like 10 or 50 or 100 messages from the 1000 that are in the queue? I was looking at 'fetch.max.bytes' config, but it seems like it is for a message size rather than number of messages.

谢谢

推荐答案

答案在于对以下概念的理解:

The answer lies in the understanding of the following concepts:

  • session.timeout.ms
  • 心跳
  • max.poll.interval.ms

在您的情况下,您的使用者通过poll()收到一条消息,但无法在max.poll.interval.ms时间内完成处理.因此,假定代理将其挂起,并且发生分区的重新平衡,因此该使用者失去了所有分区的所有权.它被标记为已失效,并且不再属于消费者组.

In your case, your consumer receives a message via poll() but is not able to complete the processing in max.poll.interval.ms time. Therefore, it is assumed hung by the Broker and re-balancing of partitions happen due to which this consumer loses the ownership of all partitions. It is marked dead and is no longer part of a consumer group.

然后,当您的使用者完成处理并再次调用poll()时,会发生两件事:

Then when your consumer completes the processing and calls poll() again two things happen:

  1. 提交失败,因为使用者不再拥有分区.
  2. 经纪人识别出该消费方再次启动,因此触发了重新平衡,并且该消费方再次加入消费方组,开始拥有分区并向中介请求消息.由于先前的消息未标记为已提交(请参阅上面的#1,提交失败),并且正在等待处理,因此代理再次将相同的消息传递给使用者.

消费者再次花费大量时间来处理,并且由于无法在少于max.poll.interval.ms的时间内完成处理,因此1.和2.不断循环重复.

Consumer again takes a lot of time to process and since is unable to finish processing in less than max.poll.interval.ms, 1. and 2. keep repeating in a loop.

要解决此问题,您可以根据消费者需要处理的时间,将max.poll.interval.ms增加到足够大的值.这样,您的使用者就不会被标记为已死,也不会收到重复的消息.但是,真正的解决方法是检查您的处理逻辑并尝试减少处理时间.

To fix the problem, you can increase the max.poll.interval.ms to a large enough value based on how much time your consumer needs for processing. Then your consumer will not get marked as dead and will not receive duplicate messages. However, the real fix is to check your processing logic and try to reduce the processing time.

这篇关于为什么kafka消费者会消耗数百条相同的消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆