Kafka Producer Thread,即使没有发送消息,也有大量线程 [英] Kafka Producer Thread, huge amound of threads even when no message is send

查看:80
本文介绍了Kafka Producer Thread,即使没有发送消息,也有大量线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前分析了我的 kafka producer spring boot 应用程序,发现许多kafka-producer-network-thread"正在运行(总共 47 个).即使没有数据发送,它也永远不会停止运行.我的应用程序看起来有点像这样:

i currently profiled my kafka producer spring boot application and found many "kafka-producer-network-thread"s running (47 in total). Which would never stop running, even when no data is sending. My application looks a bit like this:

var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))

使用 KafkaSender:

with the KafkaSender:

@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {

@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
   kafkaTemplate.executeInTransaction { kt ->
       kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
               message)
   }
}

companion object {
    val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}

因为每次我想向 Kafka 发送消息时,我都会实例化一个新的 KafkaSender,所以我认为会创建一个新线程,然后将消息发送到 kafka 队列.目前看起来像是生成了一个生产者池,但从未清理过,即使它们都没有任何事情可做.

Since each time i want to send a message to Kafka i instanciate a new KafkaSender, i thought a new thread would be created wich then sends the message to the kafka queue. Currently it looks like a pool of producers is generated, but never cleand up, even when none of them has anything to do.

这是有意为之吗?

在我看来,行为应该与数据源池几乎相同,让线程保持活动一段时间,但是当无事可做时,将其清除.

In my oppinion the behaviour should be nearly the same as datasource pooling, keep the thread alive for some time, but when there is nothing to do, clear it up.

推荐答案

使用事务时,生产者缓存按需增长,不会减少.

When using transactions, the producer cache grows on demand and is not reduced.

如果您在侦听器容器(消费者)线程上生成消息;每个主题/分区/消费者组都有一个生产者.这是解决僵尸围栏问题所必需的,这样如果发生重新平衡并且分区移动到不同的实例,事务 ID 将保持不变,以便代理可以正确处理这种情况.

If you are producing messages on a listener container (consumer) thread; there is a producer for each topic/partition/consumer group. This is required to solve the zombie fencing problem, so that if a rebalance occurs and the partition moves to a different instance, the transaction id will remain the same so the broker can properly handle the situation.

如果您不关心僵尸围栏问题(并且您可以处理重复交付),请在 DefaultKafkaProducerFactory 和数量上将 producerPerConsumerPartition 属性设置为 false生产商的规模会小得多.

If you don't care about the zombie fencing problem (and you can handle duplicate deliveries), set the producerPerConsumerPartition property to false on the DefaultKafkaProducerFactory and the number of producers will be much smaller.

这篇关于Kafka Producer Thread,即使没有发送消息,也有大量线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆