卡夫卡忽略了生产者的`transaction.timeout.ms` [英] Kafka ignoring `transaction.timeout.ms` for producer

查看:59
本文介绍了卡夫卡忽略了生产者的`transaction.timeout.ms`的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 transaction.timeout.ms 属性将生产者配置为10秒超时.但是,交易似乎在60秒后中止,这要长得多.

I configure the producer to 10-second timeout using the transaction.timeout.ms property. However, it seems that the transaction is aborted after 60 seconds, which is much longer.

请参阅以下程序:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokerConnectionString);
properties.setProperty("transactional.id", "my-transactional-id");
properties.setProperty("transaction.timeout.ms", "5000");

// start the first producer and write one event
KafkaProducer<String, String> producer =
        new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "value"));
// note the transaction is left non-completed

// start another producer with different txn.id and write second event
properties.setProperty("transactional.id", "another-transactional-id");
KafkaProducer<String, String> producer2 =
        new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer2.initTransactions();
producer2.beginTransaction();
producer2.send(new ProducerRecord<>("topic", "value2"));
producer2.commitTransaction();

// consume the events using read-committed
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", brokerConnectionString);
consumerProps.setProperty("group.id", "my-group");
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = 
        new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(singleton("topic"));
while (true) {
    for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofSeconds(1))) {
        logger.info(record.toString());
    }
}

大约在60秒后打印 value2 ,这是 transaction.timeout.ms 参数的默认值.我对财产的理解不正确吗?

The value2 is printed after about 60 seconds, which is the default value for the transaction.timeout.ms parameter. Do I understand the property incorrectly?

推荐答案

在编写问题的过程中,我找到了答案.代理配置为每60秒检查一次超时的生产者,因此该事务在下一次检查时中止.此属性对其进行配置: transaction.abort.timed.out.transaction.cleanup.interval.ms .我在测试前就启动了经纪人,这就是为什么它总是要花费大约60秒的时间.

In the course of writing the question I found the answer. Broker is configured to check timed out producers every 60 seconds, so the transaction is aborted at next check. This property configures it: transaction.abort.timed.out.transaction.cleanup.interval.ms. I started the broker just before the test, that's why it always took about 60 seconds.

这篇关于卡夫卡忽略了生产者的`transaction.timeout.ms`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆