Java Apache Kafka Producer元数据更新程序&重试逻辑 [英] Java Apache Kafka Producer Metadata Updater & Retry Logic

查看:201
本文介绍了Java Apache Kafka Producer元数据更新程序&重试逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将Spring用于Apache Kafka,并创建了一个服务,该服务通过Spring的KafkaTemplate使用Kafka Producer(org.apache.kafka.clients.producer)将消息发送到主题.在目标Kafka群集上,我禁用了自动主题创建.使用此处列出的生产者配置组合 https://kafka.apache.org/documentation/#producerconfigs 我成功控制了请求重试的次数,重试之间的时间等.

I am using Spring for Apache Kafka and have created a service that uses a Kafka Producer (org.apache.kafka.clients.producer) via Spring's KafkaTemplate to send messages to a topic. On the target Kafka cluster I have disabled auto topic creation. Using a combination of producer configurations listed here https://kafka.apache.org/documentation/#producerconfigs I am successfully controlling how many times a request is retried, time between retries, etc.

如果我提供的主题不存在,则请求在我期望的时间内超时(达到 max.block.ms 的值).但是,在超时后,我将继续按照为 retry.backoff.ms 设置的时间间隔获取日志条目(例如下面的条目),直到达到 300000 ms/5分钟为止.达到了.

If I provide a topic that does not exist the request times out when I expect it to (upon reaching the value of max.block.ms). However, after the timeout I continue to get log entries (such as the one below) at the interval set for retry.backoff.ms until 300000 ms / 5 minutes has been reached.

我一直无法确定可以更改生产者或代理上的哪个配置属性,以阻止生产者检查5分钟以查看主题是否已创建.

I've been unable to determine which configuration property on the producer or the brokers can be changed to stop the producer from checking for 5 minutes to see if the topic has been created.

有人可以将我指向正确的设置,使我可以减少此设置,或者让它在请求超时后停止检查吗?

Can someone point me to the correct setting that will allow me to reduce this or have it stop checking once the request has timed out?

日志输入示例:

WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {<specified_topic>=UNKNOWN_TOPIC_OR_PARTITION}

使用的生产者配置:

  • delivery.timeout.ms = 5000
  • linger.ms = 1000
  • max.block.ms = 8000
  • request.timeout.ms = 4000
  • max.retry.count = 0
  • retry.backoff.ms = 2000

推荐答案

Kafka Producer在第一个send之前检索并缓存主题/分区元数据.然后,它会定期尝试刷新此元数据,每个 metadata.max.age.ms (默认= 5分钟) 好",以及每个retry.backoff.ms的无效"主题.这些元数据刷新尝试就是您在日志中观察到的.

Kafka Producer retrieves and caches topic/partition metadata before first send. It then periodically tries to refresh this metadata, every metadata.max.age.ms (default=5minutes) for "good" and every retry.backoff.ms for "invalid" topics. These metadata refresh attempts is what you're observing in the log.

为防止缓存不受控制地增长,根据这些

To prevent cache from growing uncontrollably, unused topics are dropped from it after certain period of time according to these source comments. Currently, this expiry period is hardcoded in ProducerMetadata.java to be 5 minutes.

  public class ProducerMetadata extends Metadata {
      private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
      static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
        ...

通过将生产者日志级别设置为DEBUG,您实际上可以观察到所有这些活动.

You can actually observe all this activity by setting producer log level to DEBUG.

这篇关于Java Apache Kafka Producer元数据更新程序&amp;重试逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆