如何在Kafka-Connect API中设置max.poll.records [英] How to set max.poll.records in Kafka-Connect API

查看:990
本文介绍了如何在Kafka-Connect API中设置max.poll.records的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用confluent-3.0.1平台并构建Kafka-Elasticsearch连接器.为此,我正在扩展SinkConnector和SinkTask(Kafka-connect API)以从Kafka获取数据.

I am using confluent-3.0.1 platform and building a Kafka-Elasticsearch connector. For this I am extending SinkConnector and SinkTask (Kafka-connect APIs) to get data from Kafka.

作为此代码的一部分,我正在扩展SinkConnector的taskConfigs方法以返回"max.poll.records",一次仅获取100条记录.但是它不起作用,我同时获取所有记录,而且我未能在规定的时间内提交偏移量.请任何人帮我配置"max.poll.records"

As part of this code i am extending taskConfigs method of SinkConnector to return "max.poll.records" to fetch only 100 records at a time. But its not working and I am getting all records at same time and I am failing to commit offsets within the stipulated time. Please can any one help me to configure "max.poll.records"

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }

推荐答案

在连接器配置中,您不能覆盖大多数Kafka使用者配置,例如max.poll.records.不过,您可以在带有consumer.前缀的Connect worker配置中执行此操作.

You can't override most Kafka consumer configs like max.poll.records in the connector configuration. You can do so in the Connect worker configuration though, with a consumer. prefix.

这篇关于如何在Kafka-Connect API中设置max.poll.records的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆