每个Http请求有多个Kafka生产者实例 [英] Multiple Kafka Producer Instance for each Http Request
本文介绍了每个Http请求有多个Kafka生产者实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。
如何高效地为每个HTTP请求创建新的Kafka Producer实例?
//Kafka Transaction enabled
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" );
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
public void postMessage(final MyUser message) {
// wrapping the send method in a transaction
this.kafkaTemplate.executeInTransaction(kafkaTemplate -> {
kafkaTemplate.send("custom", null, message);
}
}
推荐答案
请参阅DefaultKafkaProducerFactory
的javadoc。它为生产者发起的事务维护生产者缓存。
/**
* The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
* <p>
* This implementation will return the same {@link Producer} instance (if transactions are
* not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
* implementations on each {@link #createProducer()} invocation.
...
* Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
* cache of producers is maintained; closing a producer returns it to the cache. The
* producers are closed and the cache is cleared when the factory is destroyed, the
* application context stopped, or the {@link #reset()} method is called.
...
*/
这篇关于每个Http请求有多个Kafka生产者实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文