如何修复kafka.Common.errors.TimeoutException:自批量创建以来已过1条记录过期xxx毫秒,加上延迟时间 [英] How to fix kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms has passed since batch creation plus linger time

查看:18
本文介绍了如何修复kafka.Common.errors.TimeoutException:自批量创建以来已过1条记录过期xxx毫秒,加上延迟时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka_2.11-2.1.1 以及使用Spring2.1.0.RELEASE的制片人。

我在向Kafka主题发送消息时使用了Spring,我的制作人生成了很多TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time

我使用的是以下卡夫卡制作人设置

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
我尝试了许多组合(特别是batchSize&;lingerMs),但都不起作用。如有任何帮助,请设置上述方案的设置。

使用以下配置重试...但没有成功相同的错误

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120
    retries = 1

第二次运行:

我尝试了不同的组合,但都不起作用。 因此,我认为这会是网络、SSL等方面的问题。 因此,我在运行Producer的同一台计算机上安装并运行了Kafka,即在我的本地计算机上。

我试图再次运行制作人,指向当地的卡夫卡主题。 但不走运,同样的问题。

下面是使用的配置参数。

2019-07-02 05:55:36.663  INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 0
    bootstrap.servers = [localhost:9092]
    request.timeout.ms = 60
    retries = 1
    buffer.memory = 33554432
    linger.ms = 0
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null

面临同样的错误: INBOUND_TOPIC的1条记录org.apache.kafka.common.errors.TimeoutException:即将过期--自批创建以来已过1:69毫秒,加上延迟时间

也尝试过 批次。大小为5、10和0 Linger_ms 0、5、10等。 请求超时0、45、60、120、300等

无法正常工作...相同的错误。

我还应该尝试什么,解决方案是什么?

如何避免负密钥生成

是的,我设置了本地设置并打印带有分区信息的日志,如下所示

2019-07-03 02:48:28.822 INFO 7092-[lt-Dispatcher-2]c.c.p.CompanyInfoPartiator:TOPIC:INBUND_TOPIC KEY=597736248-Entropy Cayman Solar Ltd.-NULL-NULL-NULLPartition=-1 2019-07-03 02:48:28.931错误7092-[ad|Producer-1]o.s.k.support.LoggingProducerListener:发送带有Key=‘597736248-Entropy Cayman Solar Ltd.-NULL-NULL-NULL’和payload=‘com.spgmi.ca.prescore.model.Company@8b12343’到主题INBUND_TOPIC:

的消息时引发异常 INBOUND_TOPIC的1条记录

org.apache.kafka.common.errors.TimeoutException:过期--自批创建以来已过1:104毫秒,加上延迟时间

我的主题inbound_tope有两个分区,如下所示 C:Softwarekafkakafka_2.11-2.1.1inwindows>kafka-topics.bat--DESCRIBE--动物园管理员本地主机:2181--主题入站主题 主题:入站_主题分区计数:2复制因子:1配置: 主题:入站_主题分区:0引头:0副本:0 Isr:0 主题:入站_主题分区:1个标题:0个副本:0个Isr:0

但我的制作人似乎正在尝试发送到分区=-1。

我的分区逻辑如下

int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
        logger.info("Topic : "+ topic + "	 Key = " + (String)key + " Partition = " + p );

在key上,我正在执行hashCode()。这里需要更正什么才能避免这个负数分区数?即分区=-1

我的分区键逻辑应该是什么样子?

高度评价任何帮助。

推荐答案

该错误指示某些记录放入队列的速度快于从客户端发送它们的速度。

当您的生产者发送消息时,它们被存储在缓冲区中(在将它们发送到目标Broker之前),并且记录被一起分组成批,以增加吞吐量。向批次添加新记录时,必须在request.timeout.ms控制的-可配置时间窗口内发送该记录(默认为30秒)。如果批处理在队列中的时间较长,则会抛出TimeoutException,然后批处理记录将从队列中删除,并且不会传递给代理。

增加request.timeout.ms的值应该对您有用。


如果这不起作用,您还可以尝试减少batch.size,以便更频繁地发送批(但这次将包括更少的消息),并确保将linger.ms设置为0(这是默认值)。

请注意,更改任何配置参数后,您需要重新启动您的Kafka代理。

如果您仍然收到该错误,我认为您的网络出现了问题。您是否启用了SSL?

这篇关于如何修复kafka.Common.errors.TimeoutException:自批量创建以来已过1条记录过期xxx毫秒,加上延迟时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆