Kafka的session.timeout.ms和max.poll.interval.ms之间的差异 [英] Difference between session.timeout.ms and max.poll.interval.ms for Kafka

查看:2860
本文介绍了Kafka的session.timeout.ms和max.poll.interval.ms之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

AFAIK,max.poll.interval.ms在Kafka 0.10.1中引入.但是,尚不清楚何时可以同时使用session.timeout.ms和max.poll.interval.ms 考虑用例,在该用例中,心跳线程没有响应,但是我的处理线程设置了更高的值,因此它仍在处理记录.但是由于心跳线关闭,然后在跨过session.timeout.ms之后,实际发生了什么.因为我在POC中观察到,直到达到max.poll.interval.ms才发生消费者重新平衡. 所以对我来说session.timeout.ms似乎是多余的. 类似的解决方案

session.timeout.ms用于通过心跳机制检测使用者故障.使用者心跳线线程必须在session.timeout.ms时间到期之前将心跳线发送给代理.否则,被卡夫卡视为死者的消费者就会触发重新平衡.

heartbeat.interval.ms::两次心跳到 使用Kafka的群组管理工具时的消费者协调员. 心跳用于确保使用者的会话保持活动状态 并在新消费者加入或离开时促进平衡 组.

session.timeout.ms::用于检测客户端故障的超时时间 使用Kafka的群组管理工具.客户端定期发送 向经纪人表明其活力的心跳.如果没有心跳 在此会话期满之前由经纪人收到 超时,则代理会将该客户端从组中删除,然后 开始重新平衡.

轮询是检查消费者健康的另一种机制.使用者应调用poll()方法而不会到期max.poll.interval.ms.如果这段时间到期(通常长时间运行会导致此问题),那么再次认为消费者已死,并触发了重新平衡.

max.poll.interval.ms::poll()调用之间的最大延迟 使用消费者组管理时.这为 消费者在获取更多信息之前可以空闲的时间 记录.如果在此超时到期前未调用poll(), 则认为该消费者失败了,该小组将重新平衡 以便将分区重新分配给另一个成员.

其他要点是(从版本0.10.1.0开始):

rebalance.timeout = max.poll.interval.ms

由于我们为客户提供了最多max.poll.interval.ms来处理 一批记录,这也是消费者可以使用的最长时间 预计在最坏的情况下会重新加入该小组.因此,我们 建议将Java客户端中的重新平衡超时设置为相同 使用max.poll.interval.ms配置的值.当重新平衡开始时, 后台线程将继续发送心跳.消费者 在处理完成且用户未完成之前,用户将不会重新加入该组 调用poll().从协调员的角度来看,消费者将 直到1)他们的会话超时才从组中删除 过期而没有收到心跳,或者2)重新平衡超时 到期.

因此,在您的情况下,如果session.timeout.ms到期而没有消费者的心跳,则将在此消费者组中开始重新平衡.重新平衡开始后,消费组中的所有消费方都将被吊销,Kafka等待仍在向心跳发送poll()的所有消费方(通过轮询此时的消费方发送joinGroupRequest)直到重新平衡超时到期(等于max.poll.interval.ms). /p>

在重新平衡期间,您仍然可以处理已经拥有但无法提交的消息,并通过以下消息获取 CommitFailedException :

无法完成提交,因为该组已经重新平衡并且 将分区分配给另一个成员.这意味着时间 后续对poll()的调用之间的时间比配置的时间长 max.poll.interval.ms,通常表示轮询循环为 花太多时间处理邮件.您可以解决这个问题 通过增加会话超时或减小最大大小 在poll()中返回的批次具有max.poll.records.

有关更多信息,您可以检查question is posted but it doesn't answer this question.

解决方案

session.timeout.ms is used to detect consumer failures via heartbeat mechanism. The consumer heartbeat thread must send a heartbeat to the broker before session.timeout.ms time expires. Otherwise consumer considered as dead by Kafka and rebalance is triggered.

heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.

session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.

Polling is another mechanism to check consumers health. A consumer is expected to call poll() method without expiring max.poll.interval.ms. If this time expires (normally long running process leads this problem) again consumer considered as dead and rebalance is triggered.

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

Other important point is that (from version 0.10.1.0):

rebalance.timeout = max.poll.interval.ms

Since we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with max.poll.interval.ms. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll(). From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.

So in your case, if session.timeout.ms expires without heartbeat for a consumer then rebalance is started in this consumer group. After rebalance starts all the consumer in the consumer group is revoked and Kafka waits all the consumers which is still sending heartbeat to poll() (by polling consumers send joinGroupRequest at that point) until rebalance timeout expires which is equal to max.poll.interval.ms.

During rebalance you can still process message that you already have but cannot commit and get CommitFailedException with this message:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

For more information you can check this.

这篇关于Kafka的session.timeout.ms和max.poll.interval.ms之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆