Apache Kafka-了解在poll()方法中作为参数存在的超时与fetch.max.wait.ms之间的关系 [英] Apache Kafka- Understanding the relationship between timeout present as a parameter in the poll() method and fetch.max.wait.ms

查看:930
本文介绍了Apache Kafka-了解在poll()方法中作为参数存在的超时与fetch.max.wait.ms之间的关系的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解poll()方法中存在的超时与配置fetch.max.wait.ms之间的关系.所以,可以说我有以下配置

I wanted to understand the relationship between the timeout present in the poll() method and the configuration fetch.max.wait.ms. So , lets say I have the following configuration

          fetch.min.bytes= 50
          fetch.max.wait.ms= 400 ms
          timeout on poll method= 200 ms

因此,请考虑以上述指定的超时时间调用poll方法.消费者将获取请求发送给该分区的负责人Kafka经纪人.代理没有足够的字节根据配置fetch.min.bytes发送,因此它将等待最多400毫秒以响应足够的数据到达.但是我已将poll方法的超时配置为200毫秒,这是否意味着在后台发送获取请求时,它仅等待200毫秒以使服务器响应,然后终止连接?

So, consider I call the poll method with the above specified timeout. The Consumer sends a fetch request to the Kafka Broker who is the leader for that partition. The Broker has not got enough bytes to send according to the configuration fetch.min.bytes, so it will wait for maximum of 400 milliseconds to respond for enough data to arrive. But I have configured, the timeout to be 200 ms for the poll method, so does that mean, behind the hood, when the fetch request is sent, it only waits for 200 ms for the server to respond and then terminates the connection?

结果如何?在这种情况下,可以肯定地说,您总是将超时配置为-

Is that how it will turn out? In this scenario, would it be safe to say, you would always configure your timeout to be -

         timeout >= network latency + fetch.max.wait.ms

此外,Kafka Consumer是否会主动获取记录?我的意思是,当用户代码忙于处理最后一次poll()方法调用上的记录时,消费者正在忙于在后台获取记录,以便减少下次调用poll()时的延迟.如果是,它将如何维护此内部缓冲区?我们还可以配置此内部缓冲区的最大大小吗?

Also, does Kafka Consumer fetch records proactively? By that I mean, is the consumer busy fetching records behind the hood , when the user code is busy processing the records on the last poll() method call, so that to reduce latency when the next time poll() is called. If yes, how does it maintain this internal buffer? Can we also configure, the maximum size of this internal buffer?

先谢谢您.

推荐答案

超时轮询允许您执行异步处理.订阅了一组主题后,当调用 poll(long)时,消费者将自动加入该组.轮询API旨在确保消费者的可用性.

Time out on poll allows you to do asynchronous processing. After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer availability.

只要使用者继续进行轮询,使用者就将留在组中并继续从分配给它的分区中接收消息.

As long as the consumer continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.

内幕下,消费者向服务器发送定期心跳.如果使用者在 session.timeout.ms 期间崩溃或无法发送心跳,则该使用者将被视为已死,并将重新分配其分区.

Under the hood, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

但是我们要注意 poll(long)中的long值不要太长.这使整个过程同步.您可以在下面的链接中阅读讨论内容.

But we should be careful that the long value in the poll(long) is not too long. This makes the whole process synchronous. You can read the discussion in the below link.

https://github.com/confluentinc/confluent-kafka-dotnet/issues/142

fetch.max.wait.ms 这将确保无论何时创建获取请求,服务器都会阻止该请求,直到指定的时间为止.如果没有足够的数据立即满足 fetch.min.bytes 给出的要求,通常会启动此操作.

fetch.max.wait.ms This will make sure whenever a fetch request is created the server will block the request until the time specified. This usually kicks in if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

要点1 :如果有提取请求,则服务器不满足50字节的请求,将在400毫秒内阻止您的提取请求.

Point 1: When there is a fetch request the server blocks your fetch request for 400ms if it does not meet 50bytes.

fetch.min.bytes= 50
fetch.max.wait.ms= 400 ms

第2点:您的消费者每200毫秒发送一次心跳,以避免kafka造成重新平衡.

Point 2: For every 200ms you consumer sends a heartbeat to avoid rebalance by kafka.

轮询方法超时= 200毫秒

timeout on poll method= 200 ms

发生点1时,您的消费者处于空闲状态,但是由于您进行了点2,因此心跳每200毫秒发送一次,因此不会发生重新平衡,并且您可能在接下来的200毫秒中异步执行一些任务.

When Point 1 happens your consumer is idle but since you did Point 2 the heart beat is sent at every 200ms so rebalance does not occur and you may perform some tasks asynchronously for the next 200ms.

因此,设置poll()只会确保您的使用者不被视为已死,并且fetch.max.wait.ms会告诉服务器当提取请求到来时需要等待多长时间.我的意思是说,这两个参数之间没有固有的依赖性.poll()更像是在代码中执行事务的异步方式.

So setting poll() will only make sure that your consumer is not considered dead and fetch.max.wait.ms is to tell the server about how long it need to wait when the fetch request comes. What i mean to say is there is not inherent dependency on the two parameter. poll() is more of the asynchronous way of doing things in your code.

超时仅基于poll().

这篇关于Apache Kafka-了解在poll()方法中作为参数存在的超时与fetch.max.wait.ms之间的关系的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆