融合的Kafka使用者配置-session.timeout.ms和max.poll.interval.ms是如何关联的? [英] Confluent Kafka Consumer Configuration - How session.timeout.ms and max.poll.interval.ms are related?

查看:755
本文介绍了融合的Kafka使用者配置-session.timeout.ms和max.poll.interval.ms是如何关联的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解以下两个融合的使用者配置的默认值如何协同工作.

max.poll.interval.ms-根据相关文档,默认值为300,000 ms

session.timeout.ms-根据相关文档,默认值为10,000 ms

heartbeat.interval.ms-根据相关文档,默认值为3,000 ms

假设我在配置中使用了这些默认值.现在我在这里有一个问题.

例如,让我们假设一个消费者,消费者每3,000 ms发送一次心跳,而我的第一次轮询发生在时间戳t1,然后第二次轮询发生在t1 + 20,00 ms.然后,这会导致重新平衡,因为超出了"session.timeout.ms"?还是可以正常工作,因为消费者确实按照预期的时间戳发送了心跳信号?

解决方案

在上一个线程中 解决方案

In previous thread Here also explained about session time out and max poll timeout. Let me also explain about my understanding on this.

ConsumerRecords poll(final long timeout): is used to fetch data sequentially from topic's partition starting from last consumed offset or manual set offset. This will return immediately if there are record available otherwise it will await the passed timeout. If timeout passes will return empty record. The poll API keep calling to fetch any new message arrived as well as its ensure liveness of consumer.Underneath the covers

session.timeout.ms During each poll Consumer coordinator send heartbeat to broker to ensure that consumer's session live and active. If broker didn't receive any heartbeat till session.timeout.ms broker then broker leave that consumer and do rebalance

You can assume session.timeout.ms is maximum time broker wait to get heartbeat from consumer whereas heartbeat.interval.ms is expected time consumer suppose to send heartbeat to Broker. thats explained heartbeat.interval.ms always less than session.timeout.ms because ideal case 1/3 of session timeout.

max.poll.interval.ms : The maximum delay between invocations of poll() when using consumer group management. That means consumer maximum time will be idle before fetching more records.If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance by calling poll in order to reassign the partitions to another consumer instance. If we are doing long batch processing its good to increase max.poll.interval.ms but please note increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. We can tune by keeping max poll interval low by tuning max.poll.records.

Now let's discuss how they relate to each other.

Consumer while calling poll its check heartbeat, session time out poll time out in background as below manner:

  1. Consumer coordinator check if consumer is not in rebalancing state if still rebalancing then wait coordinator to join the consumer. wait and call poll . Please note if max.poll.interval.ms large it will take more time to rebalance.
  2. After poll and rebalance completed coordinator check session time out if session timeout has expired without seeing a successful heartbeat, old coordinator will get disconnected so next poll will try to rebalance. So Session timeout directly dependent time coordinator liveness if session time out consumer coordinator itself get dead and call poll will have to assign new coordinator before rebalancing.

  3. After session timeout check coordinator validate heartbeat.pollTimeoutExpired if poll timeout has expired, which means that the foreground thread has stalled in between calls to poll(), so member explicitly leave the group and call poll to get join new consumer not whole consumer group coordinator.

  4. After session time out and poll time out validation and before sending heart beat status , consumer coordinator check heart beat timeout, if heart beat exceed max delay heart beat time then pause/wait to retry backoff and poll again.
  5. If heartbeat time is also in limit not exceed then consumer coordinator sent sendHeartbeatRequest
  6. In case of sendHeartbeatRequest success thread will reset heartbeat time and call poll but in case of fail and consumer group is not in rebalance state it will wakeup consumer group coordinator to call poll again.

As mentioned on shared link polling is independent with heartbeat so during polling in case poll is quite larger heartbeat still allow to sent heartbeat which make sure your thread are live means session time out doesn't directly link to poll .

session.timeout.ms: Max time to receive heart beat

max.poll.interval.ms: Max time on independent processing thread

So if you set max.poll.interval.ms 300,000 then will have 300,000 ms to next poll that means consumer thread have max 300,000 ms to complete processing. In between heartbeat will keep sending heartbeat request at heartbeat.interval.ms i.e. 3,000 to indicate thread is still live and in case no heartbeat till session.timeout.ms i.e. 10,000 coordinator will be dead and call poll to reassign new coordinator and rebalancing

这篇关于融合的Kafka使用者配置-session.timeout.ms和max.poll.interval.ms是如何关联的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆