Java Apache Kafka Producer Metadata Updater &重试逻辑 [英] Java Apache Kafka Producer Metadata Updater & Retry Logic

查看:35
本文介绍了Java Apache Kafka Producer Metadata Updater &重试逻辑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将 Spring 用于 Apache Kafka,并创建了一个服务,该服务通过 Spring 的 KafkaTemplate 使用 Kafka Producer (org.apache.kafka.clients.producer) 向主题发送消息.在目标 Kafka 集群上,我禁用了自动主题创建.使用此处列出的生产者配置组合 https://kafka.apache.org/documentation/#producerconfigs 我成功地控制了请求的重试次数、重试之间的时间等.

I am using Spring for Apache Kafka and have created a service that uses a Kafka Producer (org.apache.kafka.clients.producer) via Spring's KafkaTemplate to send messages to a topic. On the target Kafka cluster I have disabled auto topic creation. Using a combination of producer configurations listed here https://kafka.apache.org/documentation/#producerconfigs I am successfully controlling how many times a request is retried, time between retries, etc.

如果我提供一个不存在的主题,请求会在我期望的时候超时(达到 ma​​x.block.ms 的值后).但是,超时后,我继续以 retry.backoff.ms 设置的间隔获取日志条目(如下所示),直到 300000 ms/5 分钟到达.

If I provide a topic that does not exist the request times out when I expect it to (upon reaching the value of max.block.ms). However, after the timeout I continue to get log entries (such as the one below) at the interval set for retry.backoff.ms until 300000 ms / 5 minutes has been reached.

我一直无法确定生产者或代理的哪些配置属性可以更改以阻止生产者检查 5 分钟以查看主题是否已创建.

I've been unable to determine which configuration property on the producer or the brokers can be changed to stop the producer from checking for 5 minutes to see if the topic has been created.

有人可以指出正确的设置,让我可以减少这种情况或在请求超时后停止检查吗?

Can someone point me to the correct setting that will allow me to reduce this or have it stop checking once the request has timed out?

日志条目示例:

WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater: [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {<specified_topic>=UNKNOWN_TOPIC_OR_PARTITION}

使用的生产者配置:

  • delivery.timeout.ms = 5000
  • linger.ms = 1000
  • max.block.ms = 8000
  • request.timeout.ms= 4000
  • max.retry.count = 0
  • retry.backoff.ms = 2000

推荐答案

Kafka Producer 在第一次 send 之前检索和缓存主题/分区元数据.然后它会定期尝试刷新此元数据,每个 metadata.max.age.ms(默认为 5 分钟)用于良好"主题,每个 retry.backoff.ms 用于无效"主题.这些元数据刷新尝试是您在日志中观察到的.

Kafka Producer retrieves and caches topic/partition metadata before first send. It then periodically tries to refresh this metadata, every metadata.max.age.ms (default=5minutes) for "good" and every retry.backoff.ms for "invalid" topics. These metadata refresh attempts is what you're observing in the log.

为了防止缓存不受控制地增长,根据这些源注释.目前,这个有效期是硬编码在 ProducerMetadata.java 为 5 分钟.

To prevent cache from growing uncontrollably, unused topics are dropped from it after certain period of time according to these source comments. Currently, this expiry period is hardcoded in ProducerMetadata.java to be 5 minutes.

  public class ProducerMetadata extends Metadata {
      private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
      static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
        ...

您实际上可以通过将生产者日志级别设置为 DEBUG 来观察所有这些活动.

You can actually observe all this activity by setting producer log level to DEBUG.

这篇关于Java Apache Kafka Producer Metadata Updater &amp;重试逻辑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆