消费过程中,如果记录/消息花费的时间超过"max.poll.interval.ms",那么记录/消息会发生什么? [英] What happens to records/messages during consumption when the record processing took more than 'max.poll.interval.ms'?

查看:43
本文介绍了消费过程中,如果记录/消息花费的时间超过"max.poll.interval.ms",那么记录/消息会发生什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我低于消费者设置.

auto.offset.reset=earliest
enable.auto.commit=true (default value)
session.timeout.ms=10000 (default value)
max.poll.interval.ms= 300000 (default value)

使用上述配置,假设我在主题A(只有1个分区)中有5条消息(m1,m2,m3,m4和m5).现在,我已经使消费者订阅了该主题,并且能够处理前两个消息(m1和m2),而没有任何问题和已提交的偏移量.

With the above configuration, let's say i have five messages( m1, m2, m3, m4 and m5) in a topic A (with only 1 partition). Now I've consumer subscribed to this topic and was able to process first two messages (m1 and m2) without any issues and committed offset.

现在,让我们说消费者获得了第三条消息m3并试图对其进行处理,由于某些网络延迟,它花费了300100毫秒进行处理.现在,据我了解,由于记录处理所花费的时间超过max.poll.interval.ms,因此不会发生偏移提交,因此将消费者视为已死亡并从组中删除.

Now, Let us say the consumer got the third message m3 and trying to process it and it took 300100 ms for processing because of some network latency. Now, as per my understanding, the offset commit will not happen because the record processing took more than max.poll.interval.ms and hence the consumer would be considered as dead and removed from the group.

现在我有两个问题

  1. 消息m3发生了什么?我的意思是,是否会在下一次民意调查中选择它,因为未提交抵消量
  2. 其他消息m4和m5会怎样?

推荐答案

未调用poll()导致 max.poll.inteval.ms 过期是重新平衡的原因之一.当某个消费者组中的重新平衡开始时,该消费者组中的所有消费者都将被吊销.(从使用者列表中删除)在重新平衡期间,Kafka通过调用poll()等待所有正常的使用者发送joinGroupRequest,直到重新平衡超时(重新平衡超时等于max.poll.interval.ms).在正常使用方的joinGroupRequests完成或重新平衡超时后,Kafka将分区分配给发送joinGroupRequests的使用方.

Expiring max.poll.inteval.ms without calling poll() is one of the reasons of rebalance. When rebalance starts in a consumer group, all the consumers in this consumer group are revoked. (removed from consumer list) During rebalance Kafka waits all healthy consumers to send joinGroupRequest by calling poll() until rebalance timeout (rebalance timeout equals to max.poll.interval.ms). Upon completion of joinGroupRequests of healthy consumers or rebalance timeout, Kafka assign partitions to consumers that sends joinGroupRequests.

在您的情况下:

消息m3发生了什么?我的意思是,下一个会被挑选吗因为没有提交偏移量而进行轮询

What happens to the message m3? I mean, would it be picked in the next poll because it's offset was not committed

答案:即使您的使用者被撤销,它的过程也会继续,除非您有逻辑在撤销的情况下中断进程线程.因此,将从上次轮询返回的所有消息进行处理.但是抵消不能实现.如果由于重新平衡而将此分区分配给另一个使用者,则新使用者将从M3开始获得相同的消息.因此,消息将被处理两次.当第一个使用者再次发送轮询请求时,这意味着joinGroupRequests并再次触发重新平衡.

Answer: Its process continues even after your consumer is revoked unless you have a logic to interrupt process thread in case of revoke. So all the messages returned from previous poll are processed. But offset cannot be committed. If this partition is assigned to another consumer at the result of the rebalance, then new consumer will get same messages starts from M3. So message(s) will be processed twice. When first consumer sends poll request again, that means joinGroupRequests and again rebalance will be triggered.

其他消息m4和m5会怎样?

What happens to the other messages m4 and m5?

答案::如果这些消息是从poll()以及m3返回的,则结果将相同.它们将被处理,但是旧消费者无法提交.新使用者将处理消息并提交偏移量.

Answer: If these messages are returned from poll() as well as m3, then result will be the same. They will be processed, but cannot be committed by the old consumer. New consumer will process messages and commit offset.

这篇关于消费过程中,如果记录/消息花费的时间超过"max.poll.interval.ms",那么记录/消息会发生什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆