Kafks Consumer.poll不返回任何数据 [英] Kafks consumer.poll returns no data

查看:291
本文介绍了Kafks Consumer.poll不返回任何数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个卡夫卡(2.11-0.11.0.1)经纪人.主题的默认复制因子设置为2.生产者仅将数据写入零分区.

I have two Kafka (2.11-0.11.0.1) brokers. Default replication factor of topics is set to 2. Producers write data only to zero partition.

并且我安排了执行程序,该执行程序定期运行任务.当它消耗的主题每分钟只有少量记录(每分钟100条)时,其魅力就可以发挥作用.但是对于庞大的主题(每分钟10K),民意调查方法不会返回任何数据.

And I have scheduled executor which runs the task periodically. When it consumes a topic with a small number of records per minute (100 per minute) then in works like a charm. But for huge topics (10K per minute) method poll returns no data.

任务是:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public final class TopicToDbPump implements Runnable {
  private static final Logger log = LoggerFactory.getLogger(TopicToDbPump.class);
  private final String topic;
  private final TopicPartition topicPartition;
  private final Properties properties;

  public TopicToDbPump(String topic, Properties properties) {
    this.topic = topic;
    topicPartition = new TopicPartition(topic, 0);
    this.properties = properties;
  }

  @Override
  public void run() {
    try (final Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
      consumer.assign(Collections.singleton(topicPartition));
      final long offset = readOffsetFromDb(topic);
      consumer.seek(topicPartition, offset);
      final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
      if (records.isEmpty()) {
        log.debug("No data from topic " + topic + " available");
        return;
      }
      saveData(records.records(topic));
    } catch (Throwable t) {
      log.error("Etl process " + topic + " failed with exception", t);
    }
  }
}

使用者的参数为:

"bootstrap.servers" = "host-1:9092,host-2:9092",
"group.id" = "my-group",
"enable.auto.commit" = "false",
"key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer",
"max.partition.fetch.bytes": "50000000",
"max.poll.records": "10000"

怎么了?

推荐答案

Kafka Consumer API不保证对poll()的首次调用将返回任何数据.

The Kafka Consumer API does not guarantee that the first call to poll() will return any data.

使用者首先必须连接到集群,发现分配给它的所有分区的领导者.正如您想象的那样,这可能需要花费几秒钟的时间,因此不太可能立即收到数据.

The Consumer first has to connect to the cluster, discover leaders for all partitions it's assigned to. As you imagine this can take a few seconds so it's unlikely data will have arrived immediately.

如果没有先返回数据,则应该多次调用poll().

You should instead call poll() several times if no data is returned first.

这篇关于Kafks Consumer.poll不返回任何数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆