Apache Kafka- 了解作为 poll() 方法中参数存在的超时与 fetch.max.wait.ms 之间的关系 [英] Apache Kafka- Understanding the relationship between timeout present as a parameter in the poll() method and fetch.max.wait.ms

查看:91
本文介绍了Apache Kafka- 了解作为 poll() 方法中参数存在的超时与 fetch.max.wait.ms 之间的关系的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解 poll() 方法中存在的超时与配置 fetch.max.wait.ms 之间的关系.所以,假设我有以下配置

I wanted to understand the relationship between the timeout present in the poll() method and the configuration fetch.max.wait.ms. So , lets say I have the following configuration

          fetch.min.bytes= 50
          fetch.max.wait.ms= 400 ms
          timeout on poll method= 200 ms

因此,考虑我使用上述指定的超时时间调用 poll 方法.消费者向作为该分区的领导者的 Kafka Broker 发送获取请求.Broker 根据配置 fetch.min.bytes 没有足够的字节发送,所以它会等待最多 400 毫秒来响应足够的数据到达.但是我已经配置了,轮询方法的超时时间为 200 毫秒,这是否意味着,在后台,当发送 fetch 请求时,它只等待服务器响应 200 毫秒,然后终止连接?

So, consider I call the poll method with the above specified timeout. The Consumer sends a fetch request to the Kafka Broker who is the leader for that partition. The Broker has not got enough bytes to send according to the configuration fetch.min.bytes, so it will wait for maximum of 400 milliseconds to respond for enough data to arrive. But I have configured, the timeout to be 200 ms for the poll method, so does that mean, behind the hood, when the fetch request is sent, it only waits for 200 ms for the server to respond and then terminates the connection?

结果会是这样吗?在这种情况下,可以肯定地说,您总是将超时配置为 -

Is that how it will turn out? In this scenario, would it be safe to say, you would always configure your timeout to be -

         timeout >= network latency + fetch.max.wait.ms

此外,Kafka Consumer 是否主动获取记录?我的意思是,当用户代码忙于处理最后一次 poll() 方法调用时的记录时,消费者是否忙于在后台获取记录,以便在下次调用 poll() 时减少延迟.如果是,它如何维护这个内部缓冲区?我们还可以配置,这个内部缓冲区的最大大小吗?

Also, does Kafka Consumer fetch records proactively? By that I mean, is the consumer busy fetching records behind the hood , when the user code is busy processing the records on the last poll() method call, so that to reduce latency when the next time poll() is called. If yes, how does it maintain this internal buffer? Can we also configure, the maximum size of this internal buffer?

先谢谢你.

推荐答案

轮询超时 允许您进行异步处理.订阅一组主题后,消费者将在调用poll(long) 时自动加入该组.poll API 旨在确保消费者的可用性.

Time out on poll allows you to do asynchronous processing. After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer availability.

只要消费者继续调用poll,消费者就会留在组中,并继续从分配给它的分区接收消息.

As long as the consumer continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.

在幕后,消费者定期向服务器发送心跳.如果消费者崩溃或在 session.timeout.ms 持续时间内无法发送心跳,则消费者将被视为死亡,其分区将被重新分配.

Under the hood, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

但是我们应该注意poll(long)中的long值不要太长.这使得整个过程同步.您可以阅读以下链接中的讨论.

But we should be careful that the long value in the poll(long) is not too long. This makes the whole process synchronous. You can read the discussion in the below link.

https://github.com/confluentinc/confluent-kafka-dotnet/issues/142

fetch.max.wait.ms 这将确保无论何时创建获取请求,服务器都会阻止该请求,直到指定的时间.如果没有足够的数据来立即满足 fetch.min.bytes 给出的要求,通常会出现这种情况.

fetch.max.wait.ms This will make sure whenever a fetch request is created the server will block the request until the time specified. This usually kicks in if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

第 1 点:当有 fetch 请求时,如果它不满足 50bytes,服务器会阻止您的 fetch 请求 400ms.

Point 1: When there is a fetch request the server blocks your fetch request for 400ms if it does not meet 50bytes.

fetch.min.bytes= 50
fetch.max.wait.ms= 400 ms

第 2 点:消费者每 200 毫秒发送一次心跳以避免 kafka 重新平衡.

Point 2: For every 200ms you consumer sends a heartbeat to avoid rebalance by kafka.

轮询方法超时 = 200 毫秒

timeout on poll method= 200 ms

当第 1 点发生时,您的消费者处于空闲状态,但由于您执行了第 2 点,心跳每 200 毫秒发送一次,因此不会发生重新平衡,您可以在接下来的 200 毫秒内异步执行某些任务.

When Point 1 happens your consumer is idle but since you did Point 2 the heart beat is sent at every 200ms so rebalance does not occur and you may perform some tasks asynchronously for the next 200ms.

所以设置 poll() 只会确保你的消费者不会被认为是死的,而 fetch.max.wait.ms 是告诉服务器当 fetch 请求到来时它需要等待多长时间.我的意思是说对这两个参数没有固有的依赖性.poll() 更像是代码中的异步处理方式.

So setting poll() will only make sure that your consumer is not considered dead and fetch.max.wait.ms is to tell the server about how long it need to wait when the fetch request comes. What i mean to say is there is not inherent dependency on the two parameter. poll() is more of the asynchronous way of doing things in your code.

超时纯粹基于 poll().

这篇关于Apache Kafka- 了解作为 poll() 方法中参数存在的超时与 fetch.max.wait.ms 之间的关系的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆