fetch.max.wait.ms 与 poll() 方法的参数 [英] fetch.max.wait.ms vs parameter to poll() method

查看:272
本文介绍了fetch.max.wait.ms 与 poll() 方法的参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我提出问题之前,我想指出一个类似的问题已经被问到 here 但尚未回答,所以我再次询问.请不要将此标记为重复,因为前面提到的问题没有任何答案.

Before I ask my question, I'd like to point out that a similar question has been asked here but it hasn't been answered so I am asking again. Please do not mark this as duplicate as the previously mentioned question doesn't have any answers.

我对 fetch.max.wait.msconsumer.poll() 有疑问.这是我在研究上述配置时发现的

I have a doubt regarding fetch.max.wait.ms and consumer.poll(<value>). This is what I've found in my research of the above mentioned configs

poll() 方法采用超时参数.这指定了 poll 返回所需的时间,有无数据

The poll() method takes a timeout parameter. This specifies how long it will take poll to return, with or without data

如果您将 fetch.max.wait.ms 设置为 100 ms 并将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的 fetch 请求,并且会在有 1 MB 数据要返回时以数据进行响应或 100 毫秒后,以先发生者为准.

If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.

所以我的问题是,当 fetch.max.wait.ms=500consumer.poll(200)fetch.min.bytes 时会发生什么= 500 但代理没有足够的数据返回 fetch.min.bytes 设置的?

So my question is, what happens when fetch.max.wait.ms=500, consumer.poll(200) and fetch.min.bytes= 500 but the broker does not have enough data to return as set by fetch.min.bytes?

推荐答案

fetch.min.bytes

此属性允许消费者指定最小数据量它希望在获取记录时从代理接收.如果一个经纪人收到来自消费者的记录请求,但新的记录的字节数少于 fetch.min.bytes,代理将等待更多消息可用,然后再将记录发回给消费者.

This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a broker receives a request for records from a consumer but the new records amount to fewer bytes than fetch.min.bytes, the broker will wait until more messages are available before sending the records back to the consumer.

fetch.max.wait.ms

它会通知broker等待它有足够的数据发送之前回应消费者.

It Will inform the broker to wait until it has enough data to send before responding to the consumer.

示例:如果将 fetch.max.wait.ms 设置为 100 ms,将 fetch.min.bytes 设置为 1 MB,Kafka 将收到来自消费者的 fetch 请求并做出响应当有 1 MB 的数据要返回时或 100 毫秒后的数据时,以先发生者为准.

Example: If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will receive a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.

在向消费者响应消息时,两个参数以上的控制代理.

Above two-parameter control broker while responding message to the consumer.

投票(超时)

基本上 poll() 控制在代理中没有数据可用于消费的情况下 poll() 将阻塞多长时间.

Basically poll() controls how long poll() will block if data is not available in the broker to consume.

在消费者端请求轮询以获取 Broker 响应的记录.它调用 fetchrecords() 并且如果代理中已经存在满足上述参数 fetch.min.bytes 和 fetch.max.wait.ms 的记录,它将立即响应,否则等待给定的超时返回空,以防代理中没有可用的记录.

the poll is requested on the consumer side to fetch records responded by the Broker. It calls fetchrecords() and if records already available in broker with satisfying the above parameters fetch.min.bytes and fetch.max.wait.ms it will respond immediately else wait till given timeout return empty in case no records available in the broker.

下面解释了KafkaConsumer类中的pollForfetches方法

It explained below pollForfetches methods in KafkaConsumer class

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

        // if data is available already, return it immediately
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        // We do not want to be stuck blocking in the poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        client.poll(pollTimeout, startMs, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }

        return fetcher.fetchedRecords();
    }

如果 fetch.min.bytes= 500 和 fetch.max.wait.ms=500 这意味着代理将在有 500 字节数据要返回时或在 500 毫秒后响应消费者,以先发生者为准.消费者侧轮询将每 200 毫秒调用一次,调用 fetchedRecords 以接收代理提供的任何消息.

if fetch.min.bytes= 500 and fetch.max.wait.ms=500 which means the broker will respond to the consumer when it has 500 bytes of data to return or after 500 ms, whichever happens first. Consumer side poll will be calling every 200ms calling fetchedRecords to receive any message provided by the broker.

这篇关于fetch.max.wait.ms 与 poll() 方法的参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆