Confluent Kafka Consumer Configuration - session.timeout.ms 和 max.poll.interval.ms 有什么关系? [英] Confluent Kafka Consumer Configuration - How session.timeout.ms and max.poll.interval.ms are related?

查看:41
本文介绍了Confluent Kafka Consumer Configuration - session.timeout.ms 和 max.poll.interval.ms 有什么关系?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解以下两个融合的消费者配置的默认值如何协同工作.

max.poll.interval.ms - 根据融合文档,默认值为 300,000 毫秒

session.timeout.ms - 根据融合文档,默认值为 10,000 毫秒

heartbeat.interval.ms - 根据融合文档,默认值为 3,000 毫秒

假设我在配置中使用这些默认值.现在我有一个问题.

例如,假设对于消费者,消费者每 3,000 毫秒发送一次心跳,我的第一次轮询发生在时间戳 t1,然后第二次轮询发生在 t1 + 20,00 毫秒.那么它会导致重新平衡,因为这超过了 'session.timeout.ms' 吗?或者它会正常工作,因为消费者确实按照预期的时间戳发送了心跳?

解决方案

在之前的帖子中 此处 还解释了会话超时和最大轮询超时.我也解释一下我对此的理解.

ConsumerRecords 民意调查(最终长时间超时):用于从上次消耗的偏移量或手动设置的偏移量开始从主题的分区顺序获取数据.如果有可用记录,这将立即返回,否则它将等待通过的超时.如果超时通过将返回空记录.轮询 API 不断调用以获取任何到达的新消息,并确保消费者的活跃度.在幕后

session.timeout.ms 在每次轮询期间,Consumer coordinator 向 broker 发送心跳,以确保消费者的 session 处于活动状态.如果代理在 session.timeout.ms 之前没有收到任何心跳,则代理离开该消费者并重新平衡

您可以假设 session.timeout.ms 是代理等待从消费者获得心跳的最长时间,而 heartbeat.interval.ms 是消费者假设向代理发送心跳的预期时间.这解释了 heartbeat.interval.ms 总是小于 session.timeout.ms,因为理想情况下会话超时的 1/3.

ma​​x.poll.interval.ms :使用消费者组管理时调用 poll() 之间的最大延迟.这意味着在获取更多记录之前,消费者最大时间将处于空闲状态.如果在此超时到期之前没有调用 poll(),则认为消费者失败,组将通过调用 poll 重新平衡,以便将分区重新分配给另一个消费者实例.如果我们正在进行长时间的批处理,增加 max.poll.interval.ms 是有好处的,但请注意增加这个值可能会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡.我们可以通过调整 max.poll.records 来保持较低的最大轮询间隔.

现在让我们讨论它们之间的关系.

Consumer 在调用 poll 时检查心跳,session time out poll time out in background 如下:

  1. 消费者协调器检查消费者是否未处于重新平衡状态,如果仍在重新平衡,则等待协调器加入消费者.等待并调用 poll .请注意,如果 max.poll.interval.ms 较大,则重新平衡需要更多时间.
  2. 轮询和重新平衡完成后协调器检查会话超时如果会话超时没有看到成功的心跳,旧协调器将断开连接,因此下一次轮询将尝试重新平衡.因此,如果会话超时消费者协调器本身死掉并且呼叫轮询必须在重新平衡之前分配新的协调器,则会话超时直接取决于时间协调器的活跃度.

  3. 在会话超时后检查协调器验证 heartbeat.pollTimeoutExpired 如果轮询超时已过期,这意味着前台线程在对 poll() 的调用之间停滞了,所以成员明确离开组并调用 poll 来加入新的消费者而不是整个消费者组协调员.

  4. 在会话超时和轮询超时验证之后和发送心跳状态之前,消费者协调器检查心跳超时,如果心跳超过最大延迟心跳时间,则暂停/等待重试退避并再次轮询.
  5. 如果心跳时间也在限制内,则消费者协调器发送 sendHeartbeatRequest
  6. 如果 sendHeartbeatRequest 成功线程将重置心跳时间并调用 poll,但如果失败且消费者组未处于重新平衡状态,它将唤醒消费者组协调器以再次调用 poll.

正如在共享链接轮询中提到的与心跳无关,因此在轮询期间,如果轮询相当大,心跳仍然允许发送心跳,以确保您的线程处于活动状态意味着会话超时不会直接链接到轮询.

session.timeout.ms:接收心跳的最长时间

max.poll.interval.ms:独立处理线程的最大时间

因此,如果您将 max.poll.interval.ms 设置为 300,000,那么下一次轮询将有 300,000 毫秒的时间,这意味着消费者线程有最多 300,000 毫秒的时间来完成处理.在心跳之间,心跳将继续在 heartbeat.interval.ms 发送心跳请求,即 3,000 以指示线程仍然存在,以防在 session.timeout.ms 之前没有心跳,即 10,000 协调器将死亡并调用轮询重新分配新协调器并重新平衡

I'm trying to understand how the default values of below two confluent consumer configurations work together.

max.poll.interval.ms - As per confluent documentation, the default value is 300,000 ms

session.timeout.ms - As per confluent documentation, the default value is 10,000 ms

heartbeat.interval.ms - As per confluent documentation, the default value is 3,000 ms

Let's say if I'm using these default values in my configuration. Now I've a question here.

For example, let's assume for a consumer, consumer is sending heartbeats every 3,000 ms and my first poll happened at the timestamp t1 and then second poll happened at t1 + 20,00 ms. Then would it cause a re-balance because this exceed the 'session.timeout.ms' ? or would it work fine as the consumer did send a heartbeat as per the expected timestamp?

解决方案

In previous thread Here also explained about session time out and max poll timeout. Let me also explain about my understanding on this.

ConsumerRecords poll(final long timeout): is used to fetch data sequentially from topic's partition starting from last consumed offset or manual set offset. This will return immediately if there are record available otherwise it will await the passed timeout. If timeout passes will return empty record. The poll API keep calling to fetch any new message arrived as well as its ensure liveness of consumer.Underneath the covers

session.timeout.ms During each poll Consumer coordinator send heartbeat to broker to ensure that consumer's session live and active. If broker didn't receive any heartbeat till session.timeout.ms broker then broker leave that consumer and do rebalance

You can assume session.timeout.ms is maximum time broker wait to get heartbeat from consumer whereas heartbeat.interval.ms is expected time consumer suppose to send heartbeat to Broker. thats explained heartbeat.interval.ms always less than session.timeout.ms because ideal case 1/3 of session timeout.

max.poll.interval.ms : The maximum delay between invocations of poll() when using consumer group management. That means consumer maximum time will be idle before fetching more records.If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance by calling poll in order to reassign the partitions to another consumer instance. If we are doing long batch processing its good to increase max.poll.interval.ms but please note increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. We can tune by keeping max poll interval low by tuning max.poll.records.

Now let's discuss how they relate to each other.

Consumer while calling poll its check heartbeat, session time out poll time out in background as below manner:

  1. Consumer coordinator check if consumer is not in rebalancing state if still rebalancing then wait coordinator to join the consumer. wait and call poll . Please note if max.poll.interval.ms large it will take more time to rebalance.
  2. After poll and rebalance completed coordinator check session time out if session timeout has expired without seeing a successful heartbeat, old coordinator will get disconnected so next poll will try to rebalance. So Session timeout directly dependent time coordinator liveness if session time out consumer coordinator itself get dead and call poll will have to assign new coordinator before rebalancing.

  3. After session timeout check coordinator validate heartbeat.pollTimeoutExpired if poll timeout has expired, which means that the foreground thread has stalled in between calls to poll(), so member explicitly leave the group and call poll to get join new consumer not whole consumer group coordinator.

  4. After session time out and poll time out validation and before sending heart beat status , consumer coordinator check heart beat timeout, if heart beat exceed max delay heart beat time then pause/wait to retry backoff and poll again.
  5. If heartbeat time is also in limit not exceed then consumer coordinator sent sendHeartbeatRequest
  6. In case of sendHeartbeatRequest success thread will reset heartbeat time and call poll but in case of fail and consumer group is not in rebalance state it will wakeup consumer group coordinator to call poll again.

As mentioned on shared link polling is independent with heartbeat so during polling in case poll is quite larger heartbeat still allow to sent heartbeat which make sure your thread are live means session time out doesn't directly link to poll .

session.timeout.ms: Max time to receive heart beat

max.poll.interval.ms: Max time on independent processing thread

So if you set max.poll.interval.ms 300,000 then will have 300,000 ms to next poll that means consumer thread have max 300,000 ms to complete processing. In between heartbeat will keep sending heartbeat request at heartbeat.interval.ms i.e. 3,000 to indicate thread is still live and in case no heartbeat till session.timeout.ms i.e. 10,000 coordinator will be dead and call poll to reassign new coordinator and rebalancing

这篇关于Confluent Kafka Consumer Configuration - session.timeout.ms 和 max.poll.interval.ms 有什么关系?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆