Kafka 生产者 TimeoutException: Expiring 1 record(s) [英] Kafka producer TimeoutException: Expiring 1 record(s)

查看:138
本文介绍了Kafka 生产者 TimeoutException: Expiring 1 record(s)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用带有 Spring-boot 的 Kafka:

Kafka Producer 类:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka 配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

假设我在主题 my-test-topic 中发送了 10 次 1000 条消息.

Let's say I have sent 10 times 1000 messages in topic my-test-topic.

10 次中有 8 次我成功地收到了我的消费者中的所有消息,但有时我会收到以下错误:

8 out of 10 times I am successfully getting all messages in my consumer but sometimes I am getting this below error:

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] 发送带有 key='null' 和 payload='{"deviceType 的消息时抛出异常":"X","deviceKeys":[{"apiKey":"XXo"}],"devices...' to topic my-test-topic

org.apache.kafka.common.errors.TimeoutException:由于 30024 毫秒,my-test-topic-4 的 1 条记录已过期,因为批处理创建加上延迟时间

推荐答案

有 3 种可能性:

  1. 增加 request.timeout.ms - 这是 Kafka 等待整个批次在缓冲区中准备好的时间.因此,在您的情况下,如果缓冲区中的消息少于 100 000 条,则会发生超时.更多信息请访问:https://stackoverflow.com/a/34794261/2707179
  2. 减少 batch-size - 与上一点相关,它会更频繁地发送批次,但它们包含的消息会更少.
  3. 根据消息大小,您的网络可能无法跟上高负载?检查您的吞吐量是否不是瓶颈.
  1. Increase request.timeout.ms - this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://stackoverflow.com/a/34794261/2707179
  2. Decrease batch-size - related to previous point, it will send batches more often but they will include fewer messages.
  3. Depending on message size, maybe your network cannot catch up with high load? Check if your throughput is not a bottleneck.

这篇关于Kafka 生产者 TimeoutException: Expiring 1 record(s)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆