当记录处理时间超过“max.poll.interval.ms"时,记录/消息在消费期间会发生什么? [英] What happens to records/messages during consumption when the record processing took more than 'max.poll.interval.ms'?

查看:64
本文介绍了当记录处理时间超过“max.poll.interval.ms"时,记录/消息在消费期间会发生什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我低于消费者设置.

auto.offset.reset=earliest
enable.auto.commit=true (default value)
session.timeout.ms=10000 (default value)
max.poll.interval.ms= 300000 (default value)

通过上述配置,假设我在主题 A 中有 5 条消息(m1、m2、m3、m4 和 m5)(只有 1 个分区).现在我已经有消费者订阅了这个主题,并且能够处理前两条消息(m1 和 m2)而没有任何问题和提交的偏移量.

With the above configuration, let's say i have five messages( m1, m2, m3, m4 and m5) in a topic A (with only 1 partition). Now I've consumer subscribed to this topic and was able to process first two messages (m1 and m2) without any issues and committed offset.

现在,假设消费者收到第三条消息 m3 并尝试处理它,由于一些网络延迟,处理时间为 300100 毫秒.现在,根据我的理解,偏移量提交不会发生,因为记录处理时间超过 max.poll.interval.ms,因此消费者将被视为死亡并从组中删除.

Now, Let us say the consumer got the third message m3 and trying to process it and it took 300100 ms for processing because of some network latency. Now, as per my understanding, the offset commit will not happen because the record processing took more than max.poll.interval.ms and hence the consumer would be considered as dead and removed from the group.

现在我有两个问题

  1. 消息 m3 会发生什么变化?我的意思是,它会在下一次投票中被选中,因为它的偏移量没有提交
  2. 其他消息 m4 和 m5 会发生什么变化?

推荐答案

Expiring max.poll.inteval.ms 而不调用 poll() 是重新平衡的原因之一.当一个消费者组中开始重新平衡时,该消费者组中的所有消费者都将被撤销.(从消费者列表中删除)在重新平衡期间,Kafka 通过调用 poll() 等待所有健康的消费者发送 joinGroupRequest,直到重新平衡超时(重新平衡超时等于 max.poll.interval.ms).在完成健康消费者的 joinGroupRequests 或重新平衡超时后,Kafka 将分区分配给发送 joinGroupRequests 的消费者.

Expiring max.poll.inteval.ms without calling poll() is one of the reasons of rebalance. When rebalance starts in a consumer group, all the consumers in this consumer group are revoked. (removed from consumer list) During rebalance Kafka waits all healthy consumers to send joinGroupRequest by calling poll() until rebalance timeout (rebalance timeout equals to max.poll.interval.ms). Upon completion of joinGroupRequests of healthy consumers or rebalance timeout, Kafka assign partitions to consumers that sends joinGroupRequests.

就你而言:

消息 m3 会发生什么?我的意思是,它会在下一次被选中吗?poll 因为它的偏移量没有提交

What happens to the message m3? I mean, would it be picked in the next poll because it's offset was not committed

答案:即使在您的消费者被撤销后,它的进程仍会继续,除非您有在撤销时中断进程线程的逻辑.所以所有从上次轮询返回的消息都被处理.但是无法提交偏移量.如果这个分区在重新平衡的结果被分配给另一个消费者,那么新的消费者将从 M3 开始获得相同的消息.因此消息将被处理两次.当第一个消费者再次发送轮询请求时,这意味着将触发 joinGroupRequests 并再次重新平衡.

Answer: Its process continues even after your consumer is revoked unless you have a logic to interrupt process thread in case of revoke. So all the messages returned from previous poll are processed. But offset cannot be committed. If this partition is assigned to another consumer at the result of the rebalance, then new consumer will get same messages starts from M3. So message(s) will be processed twice. When first consumer sends poll request again, that means joinGroupRequests and again rebalance will be triggered.

其他消息 m4 和 m5 会发生什么变化?

What happens to the other messages m4 and m5?

答案:如果这些消息是从 poll() 和 m3 返回的,那么结果将是相同的.它们将被处理,但不能由旧消费者提交.新消费者将处理消息并提交偏移量.

Answer: If these messages are returned from poll() as well as m3, then result will be the same. They will be processed, but cannot be committed by the old consumer. New consumer will process messages and commit offset.

这篇关于当记录处理时间超过“max.poll.interval.ms"时,记录/消息在消费期间会发生什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆